@@ 16,8 16,28 @@ import (
///////////////////////////////////////////////////////////////////////////////
// RPC server ////////////////////////////////////////////////////////////////
+// This code needs to ensure that access to the database is sequential, so
+// there are some structures and functions that are used to enforce this.
+
+// Requests to access the DB come over this structure in the form of a
+// function to execute; this gets queued up, and when the function's
+// turn arrives, it gets called with a configuration to operate on.
+// When the function is complete (returns), a message is sent on the
+// +response+ channel. One of these is created per incoming request.
+type Callback struct {
+ // The function to execute in turn
+ toExec func(*config.Configuration)
+ // When +toExec+ is complete, a message will be sent on this channel
+ response chan bool
+}
+
+// This structure represents the server -- when requests come in, they're
+// synchronized over this +channel+. There is one of these for the entire
+// server.
type Wrapper struct {
- channel chan func(*config.Configuration)
+ // Incoming messages from the network
+ channel chan Callback
+ // A message is sent on this when the server wants to exit
exit chan bool
}
@@ 34,7 54,7 @@ func RunServer(repo_dir string, port int
addr := ":" + strconv.Itoa(port)
var wrap Wrapper
- wrap.channel = make(chan func(*config.Configuration))
+ wrap.channel = make(chan Callback)
wrap.exit = make(chan bool)
go func() {
@@ 49,7 69,8 @@ func RunServer(repo_dir string, port int
for {
select {
case message := <-wrap.channel:
- message(conf)
+ message.toExec(conf)
+ message.response <- true
case <-wrap.exit:
}
}
@@ 68,77 89,107 @@ func RunServer(repo_dir string, port int
}
}
+// Utility function to handle the synchronization of function execution.
+// Queues the +fun+ function, and waits for the completion message.
+// Does not return until the function is finished executing.
+func (serv *Wrapper) wrapAndWait(fun func(conf *config.Configuration)) {
+ var callback Callback
+ callback.toExec = fun
+ callback.response = make(chan bool)
+ serv.channel <- callback
+ <-callback.response
+}
+
+// Get the list of component ids and store it in +ids+
func (serv *Wrapper) ComponentIds(_ string, ids *[]string) error {
- serv.channel <- func(conf *config.Configuration) {
+ serv.wrapAndWait(func(conf *config.Configuration) {
c := conf.GetComponents()
*ids = c
- }
- return nil
-}
+ })
-func (serv *Wrapper) Component(path string, store *config.Store) error {
- serv.channel <- func(conf *config.Configuration) {
- //rev := values.Get("rev")
- s := conf.GetComponent(path, "")
- *store = s
- }
return nil
}
-func (serv *Wrapper) Workspaces(limit int, workspaces *map[string]([]string)) error {
- serv.channel <- func(conf *config.Configuration) {
- w := conf.GetWorkspaces(limit)
- *workspaces = w
- }
+// Fetch the contents of the component at +path+, and store it in +store+.
+func (serv *Wrapper) Component(path string, store *config.Store) error {
+ serv.wrapAndWait(func(conf *config.Configuration) {
+ //rev := values.Get("rev")
+ s := conf.GetComponent(path, "")
+ *store = s
+ })
return nil
}
-func (serv *Wrapper) Workspace(num string, workspace *string) error {
- serv.channel <- func(conf *config.Configuration) {
- w := conf.GetWorkspace(num)
- *workspace = w
- }
+// Get up to +limit+ workspaces, and store it in +workspaces+
+func (serv *Wrapper) Workspaces(limit int, workspaces *map[string]([]string)) error {
+ serv.wrapAndWait(func(conf *config.Configuration) {
+ w := conf.GetWorkspaces(limit)
+ *workspaces = w
+ })
return nil
}
+// Get workspace number +num+ and store it in +workspace+
+func (serv *Wrapper) Workspace(num string, workspace *string) error {
+ serv.wrapAndWait(func(conf *config.Configuration) {
+ w := conf.GetWorkspace(num)
+ *workspace = w
+ })
+ return nil
+}
+
+// Replace a component. +args+[0] must contain the payload (the JSON
+// data that will replace the current value), and +args+[1:] must contain
+// the path to the component. Parsing errors are swallowed, and if no
+// component already exists, this functions as an insert.
func (serv *Wrapper) ReplaceComponent(args []string, _ *string) error {
- serv.channel <- func(conf *config.Configuration) {
+ serv.wrapAndWait(func(conf *config.Configuration) {
body := args[0]
path := args[1:]
component := strings.TrimRight(strings.Join(path, "/"), "/")
var config config.Store
json.Unmarshal([]byte(body), &config)
conf.PutComponent(component, config)
- }
+ })
return nil
}
+// Store the workspace to disk, optionally tagging the commit with
+// the name +tag+ (+tag+ may be "")
func (serv *Wrapper) CommitWorkspace(tag string, _ *string) error {
- serv.channel <- func(conf *config.Configuration) {
+ serv.wrapAndWait(func(conf *config.Configuration) {
conf.PutWorkspace(tag)
- }
+ })
return nil
}
+// Branch off a new workspace. +args+[0] must be the parent revision,
+// and +args+[1] must be the branch name.
func (serv *Wrapper) BranchWorkspace(args []string, _ *string) error {
- serv.channel <- func(conf *config.Configuration) {
+ serv.wrapAndWait(func(conf *config.Configuration) {
parent_rev := args[0]
branch_name := args[1]
conf.Branch(parent_rev, branch_name)
- }
+ })
return nil
}
+// Remove a component from the workspace. +path+ is the name of the
+// component to remove. If the component does not exist, NOP.
func (serv *Wrapper) RemoveComponent(path []string, _ *string) error {
- serv.channel <- func(conf *config.Configuration) {
+ serv.wrapAndWait(func(conf *config.Configuration) {
component := strings.TrimRight(strings.Join(path, "/"), "/")
conf.Delete(component)
- }
+ })
return nil
}
+// Set a component attribute value.
+// +args+[0] is the path to the component.
+// +args+[1] is the attribute name
+// +args+[2] is the new attribute value
func (serv *Wrapper) Set(args []string, ok *bool) error {
- serv.channel <- func(conf *config.Configuration) {
+ serv.wrapAndWait(func(conf *config.Configuration) {
path := args[0]
attribute := args[1]
value := args[2]
@@ 153,16 204,19 @@ func (serv *Wrapper) Set(args []string,
store := conf.GetComponent(path, "")
store[attribute] = obj
conf.PutComponent(path, store)
- }
+ })
return nil
}
+// Get a single attribute from a component
+// +args+[0] is the path to the component
+// +args+[1] is the attribute name
func (serv *Wrapper) Get(args []string, value *interface{}) error {
- serv.channel <- func(conf *config.Configuration) {
+ serv.wrapAndWait(func(conf *config.Configuration) {
path := args[0]
attribute := args[1]
store := conf.GetComponent(path, "")
*value = store[attribute]
- }
+ })
return nil
}