Move towards using external binaries / RPC plugins

- First RPC steps

- Work on some flaws in RPC model

- Remove unused TLS settings from Engine and Swarm options

- Add code to correctly encode data over the network

- Add client driver for RPC

- Rename server driver file

- Start to make marshal make sense

- Fix silly RPC method args and add client

- Fix some issues with RPC calls, and marshaling

- Simplify plugin main.go

- Move towards 100% plugin in CLI

- Ensure that plugin servers are cleaned up properly

- Make flag parsing for driver flags work properly

Includes some work carried from @dmp42 updating the build process and
tests to use the new method.

Signed-off-by: Nathan LeClaire <nathan.leclaire@gmail.com>
This commit is contained in:
Nathan LeClaire
2015-09-02 13:42:13 -07:00
parent a63f26f438
commit c8edb33ecd
113 changed files with 6295 additions and 1057 deletions

View File

@@ -0,0 +1,213 @@
package localbinary
import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"strings"
"time"
"github.com/docker/machine/libmachine/log"
)
var (
// Timeout where we will bail if we're not able to properly contact the
// plugin server.
defaultTimeout = 10 * time.Second
)
const (
pluginOutPrefix = "(%s) OUT | "
pluginErrPrefix = "(%s) DBG | "
PluginEnvKey = "MACHINE_PLUGIN_TOKEN"
PluginEnvVal = "42"
)
type PluginStreamer interface {
// Return a channel for receiving the output of the stream line by
// line, and a channel for stopping the stream when we are finished
// reading from it.
//
// It happens to be the case that we do this all inside of the main
// plugin struct today, but that may not be the case forever.
AttachStream(*bufio.Scanner) (<-chan string, chan<- bool)
}
type PluginServer interface {
// Get the address where the plugin server is listening.
Address() (string, error)
// Serve kicks off the plugin server.
Serve() error
// Close shuts down the initialized server.
Close() error
}
type McnBinaryExecutor interface {
// Execute the driver plugin. Returns scanners for plugin binary
// stdout and stderr.
Start() (*bufio.Scanner, *bufio.Scanner, error)
// Stop reading from the plugins in question.
Close() error
}
// DriverPlugin interface wraps the underlying mechanics of starting a driver
// plugin server and then figuring out where it can be dialed.
type DriverPlugin interface {
PluginServer
PluginStreamer
}
type LocalBinaryPlugin struct {
Executor McnBinaryExecutor
Addr string
MachineName string
addrCh chan string
stopCh chan bool
}
type LocalBinaryExecutor struct {
pluginStdout, pluginStderr io.ReadCloser
DriverName string
}
func NewLocalBinaryPlugin(driverName string) *LocalBinaryPlugin {
return &LocalBinaryPlugin{
stopCh: make(chan bool, 1),
addrCh: make(chan string, 1),
Executor: &LocalBinaryExecutor{
DriverName: driverName,
},
}
}
func (lbe *LocalBinaryExecutor) Start() (*bufio.Scanner, *bufio.Scanner, error) {
log.Debugf("Launching plugin server for driver %s", lbe.DriverName)
binaryPath, err := exec.LookPath(fmt.Sprintf("docker-machine-driver-%s", lbe.DriverName))
if err != nil {
return nil, nil, fmt.Errorf("Driver %q not found. Do you have the plugin binary accessible in your PATH?", lbe.DriverName)
}
log.Debugf("Found binary path at %s", binaryPath)
cmd := exec.Command(binaryPath)
lbe.pluginStdout, err = cmd.StdoutPipe()
if err != nil {
return nil, nil, fmt.Errorf("Error getting cmd stdout pipe: %s", err)
}
lbe.pluginStderr, err = cmd.StderrPipe()
if err != nil {
return nil, nil, fmt.Errorf("Error getting cmd stderr pipe: %s", err)
}
outScanner := bufio.NewScanner(lbe.pluginStdout)
errScanner := bufio.NewScanner(lbe.pluginStderr)
os.Setenv(PluginEnvKey, PluginEnvVal)
if err := cmd.Start(); err != nil {
return nil, nil, fmt.Errorf("Error starting plugin binary: %s", err)
}
return outScanner, errScanner, nil
}
func (lbe *LocalBinaryExecutor) Close() error {
if err := lbe.pluginStdout.Close(); err != nil {
return err
}
if err := lbe.pluginStderr.Close(); err != nil {
return err
}
return nil
}
func stream(scanner *bufio.Scanner, streamOutCh chan<- string, stopCh <-chan bool) {
for scanner.Scan() {
select {
case <-stopCh:
close(streamOutCh)
return
default:
streamOutCh <- strings.Trim(scanner.Text(), "\n")
if err := scanner.Err(); err != nil {
log.Warnf("Scanning stream: %s", err)
}
}
}
}
func (lbp *LocalBinaryPlugin) AttachStream(scanner *bufio.Scanner) (<-chan string, chan<- bool) {
streamOutCh := make(chan string)
stopCh := make(chan bool, 1)
go stream(scanner, streamOutCh, stopCh)
return streamOutCh, stopCh
}
func (lbp *LocalBinaryPlugin) execServer() error {
outScanner, errScanner, err := lbp.Executor.Start()
if err != nil {
return err
}
// Scan just one line to get the address, then send it to the relevant
// channel.
outScanner.Scan()
addr := outScanner.Text()
if err := outScanner.Err(); err != nil {
return fmt.Errorf("Reading plugin address failed: %s", err)
}
lbp.addrCh <- strings.TrimSpace(addr)
stdOutCh, stopStdoutCh := lbp.AttachStream(outScanner)
stdErrCh, stopStderrCh := lbp.AttachStream(errScanner)
for {
select {
case out := <-stdOutCh:
log.Debug(fmt.Sprintf(pluginOutPrefix, lbp.MachineName), out)
case err := <-stdErrCh:
log.Debug(fmt.Sprintf(pluginErrPrefix, lbp.MachineName), err)
case _ = <-lbp.stopCh:
stopStdoutCh <- true
stopStderrCh <- true
if err := lbp.Executor.Close(); err != nil {
return fmt.Errorf("Error closing local plugin binary: %s", err)
}
return nil
}
}
}
func (lbp *LocalBinaryPlugin) Serve() error {
return lbp.execServer()
}
func (lbp *LocalBinaryPlugin) Address() (string, error) {
if lbp.Addr == "" {
select {
case lbp.Addr = <-lbp.addrCh:
log.Debugf("Plugin server listening at address %s", lbp.Addr)
close(lbp.addrCh)
return lbp.Addr, nil
case <-time.After(defaultTimeout):
return "", fmt.Errorf("Failed to dial the plugin server in %s", defaultTimeout)
}
}
return lbp.Addr, nil
}
func (lbp *LocalBinaryPlugin) Close() error {
lbp.stopCh <- true
return nil
}

View File

@@ -0,0 +1,155 @@
package localbinary
import (
"bufio"
"fmt"
"io"
"os"
"testing"
"time"
"github.com/docker/machine/libmachine/log"
)
type FakeExecutor struct {
stdout, stderr io.ReadCloser
closed bool
}
func (fe *FakeExecutor) Start() (*bufio.Scanner, *bufio.Scanner, error) {
return bufio.NewScanner(fe.stdout), bufio.NewScanner(fe.stderr), nil
}
func (fe *FakeExecutor) Close() error {
fe.closed = true
return nil
}
func TestLocalBinaryPluginAddress(t *testing.T) {
lbp := &LocalBinaryPlugin{}
expectedAddr := "127.0.0.1:12345"
lbp.addrCh = make(chan string, 1)
lbp.addrCh <- expectedAddr
// Call the first time to read from the channel
addr, err := lbp.Address()
if err != nil {
t.Fatalf("Expected no error, instead got %s", err)
}
if addr != expectedAddr {
t.Fatal("Expected did not match actual address")
}
// Call the second time to read the "cached" address value
addr, err = lbp.Address()
if err != nil {
t.Fatalf("Expected no error, instead got %s", err)
}
if addr != expectedAddr {
t.Fatal("Expected did not match actual address")
}
}
func TestLocalBinaryPluginAddressTimeout(t *testing.T) {
if testing.Short() {
t.Skip("Skipping timeout test")
}
lbp := &LocalBinaryPlugin{}
lbp.addrCh = make(chan string, 1)
go func() {
_, err := lbp.Address()
if err == nil {
t.Fatalf("Expected to get a timeout error, instead got %s", err)
}
}()
time.Sleep(defaultTimeout + 1)
}
func TestLocalBinaryPluginClose(t *testing.T) {
lbp := &LocalBinaryPlugin{}
lbp.stopCh = make(chan bool, 1)
go lbp.Close()
stopped := <-lbp.stopCh
if !stopped {
t.Fatal("Close did not send a stop message on the proper channel")
}
}
func TestExecServer(t *testing.T) {
log.IsDebug = true
machineName := "test"
logReader, logWriter := io.Pipe()
log.SetOutWriter(logWriter)
log.SetErrWriter(logWriter)
defer func() {
log.IsDebug = false
log.SetOutWriter(os.Stdout)
log.SetErrWriter(os.Stderr)
}()
stdoutReader, stdoutWriter := io.Pipe()
stderrReader, stderrWriter := io.Pipe()
fe := &FakeExecutor{
stdout: stdoutReader,
stderr: stderrReader,
}
lbp := &LocalBinaryPlugin{
MachineName: machineName,
Executor: fe,
addrCh: make(chan string, 1),
stopCh: make(chan bool, 1),
}
finalErr := make(chan error, 1)
// Start the docker-machine-foo plugin server
go func() {
finalErr <- lbp.execServer()
}()
expectedAddr := "127.0.0.1:12345"
expectedPluginOut := "Doing some fun plugin stuff..."
expectedPluginErr := "Uh oh, something in plugin went wrong..."
logScanner := bufio.NewScanner(logReader)
if _, err := io.WriteString(stdoutWriter, expectedAddr+"\n"); err != nil {
t.Fatalf("Error attempting to write plugin address: %s", err)
}
if addr := <-lbp.addrCh; addr != expectedAddr {
t.Fatalf("Expected to read the expected address properly in server but did not")
}
expectedOut := fmt.Sprintf("%s%s", fmt.Sprintf(pluginOutPrefix, machineName), expectedPluginOut)
if _, err := io.WriteString(stdoutWriter, expectedPluginOut+"\n"); err != nil {
t.Fatalf("Error attempting to write to out in plugin: %s", err)
}
if logScanner.Scan(); logScanner.Text() != expectedOut {
t.Fatalf("Output written to log was not what we expected\nexpected: %s\nactual: %s", expectedOut, logScanner.Text())
}
expectedErr := fmt.Sprintf("%s%s", fmt.Sprintf(pluginErrPrefix, machineName), expectedPluginErr)
if _, err := io.WriteString(stderrWriter, expectedPluginErr+"\n"); err != nil {
t.Fatalf("Error attempting to write to err in plugin: %s", err)
}
if logScanner.Scan(); logScanner.Text() != expectedErr {
t.Fatalf("Error written to log was not what we expected\nexpected: %s\nactual: %s", expectedErr, logScanner.Text())
}
lbp.Close()
if err := <-finalErr; err != nil {
t.Fatalf("Error serving: %s", err)
}
}

View File

@@ -0,0 +1,45 @@
package plugin
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"github.com/docker/machine/libmachine"
"github.com/docker/machine/libmachine/drivers"
"github.com/docker/machine/libmachine/drivers/plugin/localbinary"
"github.com/docker/machine/libmachine/drivers/rpc"
)
func RegisterDriver(d drivers.Driver) {
if os.Getenv(localbinary.PluginEnvKey) != localbinary.PluginEnvVal {
fmt.Fprintln(os.Stderr, `This is a Docker Machine plugin binary.
Plugin binaries are not intended to be invoked directly.
Please use this plugin through the main 'docker-machine' binary.`)
os.Exit(1)
}
libmachine.SetDebug(true)
rpcd := new(rpcdriver.RpcServerDriver)
rpcd.ActualDriver = d
rpcd.CloseCh = make(chan bool)
rpc.Register(rpcd)
rpc.HandleHTTP()
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "Error loading RPC server: %s\n", err)
os.Exit(1)
}
defer listener.Close()
fmt.Println(listener.Addr())
go http.Serve(listener, nil)
<-rpcd.CloseCh
}