e3d4fa27e497 — Sean Russell 12 years ago
Fixed a race condition
1 files changed, 89 insertions(+), 35 deletions(-)

M server/main.go
M server/main.go +89 -35
@@ 16,8 16,28 @@ import (
 ///////////////////////////////////////////////////////////////////////////////
 // RPC server ////////////////////////////////////////////////////////////////
 
+// This code needs to ensure that access to the database is sequential, so
+// there are some structures and functions that are used to enforce this.
+
+// Requests to access the DB come over this structure in the form of a
+// function to execute; this gets queued up, and when the function's
+// turn arrives, it gets called with a configuration to operate on.
+// When the function is complete (returns), a message is sent on the
+// +response+ channel.  One of these is created per incoming request.
+type Callback struct {
+	// The function to execute in turn
+	toExec func(*config.Configuration)
+	// When +toExec+ is complete, a message will be sent on this channel
+	response chan bool
+}
+
+// This structure represents the server -- when requests come in, they're 
+// synchronized over this +channel+.  There is one of these for the entire
+// server.
 type Wrapper struct {
-	channel chan func(*config.Configuration)
+	// Incoming messages from the network
+	channel chan Callback
+	// A message is sent on this when the server wants to exit
 	exit    chan bool
 }
 

          
@@ 34,7 54,7 @@ func RunServer(repo_dir string, port int
 	addr := ":" + strconv.Itoa(port)
 
 	var wrap Wrapper
-	wrap.channel = make(chan func(*config.Configuration))
+	wrap.channel = make(chan Callback)
 	wrap.exit = make(chan bool)
 
 	go func() {

          
@@ 49,7 69,8 @@ func RunServer(repo_dir string, port int
 		for {
 			select {
 			case message := <-wrap.channel:
-				message(conf)
+				message.toExec(conf)
+				message.response <- true
 			case <-wrap.exit:
 			}
 		}

          
@@ 68,77 89,107 @@ func RunServer(repo_dir string, port int
 	}
 }
 
+// Utility function to handle the synchronization of function execution.
+// Queues the +fun+ function, and waits for the completion message.
+// Does not return until the function is finished executing.
+func (serv *Wrapper) wrapAndWait(fun func(conf *config.Configuration)) {
+	var callback Callback
+	callback.toExec = fun
+	callback.response = make(chan bool)
+	serv.channel <- callback
+	<-callback.response
+}
+
+// Get the list of component ids and store it in +ids+
 func (serv *Wrapper) ComponentIds(_ string, ids *[]string) error {
-	serv.channel <- func(conf *config.Configuration) {
+	serv.wrapAndWait(func(conf *config.Configuration) {
 		c := conf.GetComponents()
 		*ids = c
-	}
-	return nil
-}
+	})
 
-func (serv *Wrapper) Component(path string, store *config.Store) error {
-	serv.channel <- func(conf *config.Configuration) {
-		//rev := values.Get("rev")
-		s := conf.GetComponent(path, "")
-		*store = s
-	}
 	return nil
 }
 
-func (serv *Wrapper) Workspaces(limit int, workspaces *map[string]([]string)) error {
-	serv.channel <- func(conf *config.Configuration) {
-		w := conf.GetWorkspaces(limit)
-		*workspaces = w
-	}
+// Fetch the contents of the component at +path+, and store it in +store+.  
+func (serv *Wrapper) Component(path string, store *config.Store) error {
+	serv.wrapAndWait(func(conf *config.Configuration) {
+		//rev := values.Get("rev")
+		s := conf.GetComponent(path, "")
+		*store = s
+	})
 	return nil
 }
 
-func (serv *Wrapper) Workspace(num string, workspace *string) error {
-	serv.channel <- func(conf *config.Configuration) {
-		w := conf.GetWorkspace(num)
-		*workspace = w
-	}
+// Get up to +limit+ workspaces, and store it in +workspaces+
+func (serv *Wrapper) Workspaces(limit int, workspaces *map[string]([]string)) error {
+	serv.wrapAndWait(func(conf *config.Configuration) {
+		w := conf.GetWorkspaces(limit)
+		*workspaces = w
+	})
 	return nil
 }
 
+// Get workspace number +num+ and store it in +workspace+
+func (serv *Wrapper) Workspace(num string, workspace *string) error {
+	serv.wrapAndWait(func(conf *config.Configuration) {
+		w := conf.GetWorkspace(num)
+		*workspace = w
+	})
+	return nil
+}
+
+// Replace a component.  +args+[0] must contain the payload (the JSON
+// data that will replace the current value), and +args+[1:] must contain
+// the path to the component. Parsing errors are swallowed, and if no
+// component already exists, this functions as an insert.
 func (serv *Wrapper) ReplaceComponent(args []string, _ *string) error {
-	serv.channel <- func(conf *config.Configuration) {
+	serv.wrapAndWait(func(conf *config.Configuration) {
 		body := args[0]
 		path := args[1:]
 		component := strings.TrimRight(strings.Join(path, "/"), "/")
 		var config config.Store
 		json.Unmarshal([]byte(body), &config)
 		conf.PutComponent(component, config)
-	}
+	})
 	return nil
 }
 
+// Store the workspace to disk, optionally tagging the commit with 
+// the name +tag+ (+tag+ may be "")
 func (serv *Wrapper) CommitWorkspace(tag string, _ *string) error {
-	serv.channel <- func(conf *config.Configuration) {
+	serv.wrapAndWait(func(conf *config.Configuration) {
 		conf.PutWorkspace(tag)
-	}
+	})
 	return nil
 }
 
+// Branch off a new workspace.  +args+[0] must be the parent revision,
+// and +args+[1] must be the branch name.
 func (serv *Wrapper) BranchWorkspace(args []string, _ *string) error {
-	serv.channel <- func(conf *config.Configuration) {
+	serv.wrapAndWait(func(conf *config.Configuration) {
 		parent_rev := args[0]
 		branch_name := args[1]
 		conf.Branch(parent_rev, branch_name)
-	}
+	})
 	return nil
 }
 
+// Remove a component from the workspace. +path+ is the name of the
+// component to remove.  If the component does not exist, NOP.
 func (serv *Wrapper) RemoveComponent(path []string, _ *string) error {
-	serv.channel <- func(conf *config.Configuration) {
+	serv.wrapAndWait(func(conf *config.Configuration) {
 		component := strings.TrimRight(strings.Join(path, "/"), "/")
 		conf.Delete(component)
-	}
+	})
 	return nil
 }
 
+// Set a component attribute value.  
+// +args+[0] is the path to the component.  
+// +args+[1] is the attribute name
+// +args+[2] is the new attribute value
 func (serv *Wrapper) Set(args []string, ok *bool) error {
-	serv.channel <- func(conf *config.Configuration) {
+	serv.wrapAndWait(func(conf *config.Configuration) {
 		path := args[0]
 		attribute := args[1]
 		value := args[2]

          
@@ 153,16 204,19 @@ func (serv *Wrapper) Set(args []string, 
 		store := conf.GetComponent(path, "")
 		store[attribute] = obj
 		conf.PutComponent(path, store)
-	}
+	})
 	return nil
 }
 
+// Get a single attribute from a component
+// +args+[0] is the path to the component
+// +args+[1] is the attribute name
 func (serv *Wrapper) Get(args []string, value *interface{}) error {
-	serv.channel <- func(conf *config.Configuration) {
+	serv.wrapAndWait(func(conf *config.Configuration) {
 		path := args[0]
 		attribute := args[1]
 		store := conf.GetComponent(path, "")
 		*value = store[attribute]
-	}
+	})
 	return nil
 }