# HG changeset patch # User Sean Russell # Date 1327847614 18000 # Sun Jan 29 09:33:34 2012 -0500 # Branch weekly # Node ID e3d4fa27e4973134cae404304df6ab2406a618f7 # Parent 7f22503aa8e1858af430889ea20704f1172b9e78 Fixed a race condition diff --git a/server/main.go b/server/main.go --- a/server/main.go +++ b/server/main.go @@ -16,8 +16,28 @@ /////////////////////////////////////////////////////////////////////////////// // RPC server //////////////////////////////////////////////////////////////// +// This code needs to ensure that access to the database is sequential, so +// there are some structures and functions that are used to enforce this. + +// Requests to access the DB come over this structure in the form of a +// function to execute; this gets queued up, and when the function's +// turn arrives, it gets called with a configuration to operate on. +// When the function is complete (returns), a message is sent on the +// +response+ channel. One of these is created per incoming request. +type Callback struct { + // The function to execute in turn + toExec func(*config.Configuration) + // When +toExec+ is complete, a message will be sent on this channel + response chan bool +} + +// This structure represents the server -- when requests come in, they're +// synchronized over this +channel+. There is one of these for the entire +// server. type Wrapper struct { - channel chan func(*config.Configuration) + // Incoming messages from the network + channel chan Callback + // A message is sent on this when the server wants to exit exit chan bool } @@ -34,7 +54,7 @@ addr := ":" + strconv.Itoa(port) var wrap Wrapper - wrap.channel = make(chan func(*config.Configuration)) + wrap.channel = make(chan Callback) wrap.exit = make(chan bool) go func() { @@ -49,7 +69,8 @@ for { select { case message := <-wrap.channel: - message(conf) + message.toExec(conf) + message.response <- true case <-wrap.exit: } } @@ -68,77 +89,107 @@ } } +// Utility function to handle the synchronization of function execution. +// Queues the +fun+ function, and waits for the completion message. +// Does not return until the function is finished executing. +func (serv *Wrapper) wrapAndWait(fun func(conf *config.Configuration)) { + var callback Callback + callback.toExec = fun + callback.response = make(chan bool) + serv.channel <- callback + <-callback.response +} + +// Get the list of component ids and store it in +ids+ func (serv *Wrapper) ComponentIds(_ string, ids *[]string) error { - serv.channel <- func(conf *config.Configuration) { + serv.wrapAndWait(func(conf *config.Configuration) { c := conf.GetComponents() *ids = c - } - return nil -} + }) -func (serv *Wrapper) Component(path string, store *config.Store) error { - serv.channel <- func(conf *config.Configuration) { - //rev := values.Get("rev") - s := conf.GetComponent(path, "") - *store = s - } return nil } -func (serv *Wrapper) Workspaces(limit int, workspaces *map[string]([]string)) error { - serv.channel <- func(conf *config.Configuration) { - w := conf.GetWorkspaces(limit) - *workspaces = w - } +// Fetch the contents of the component at +path+, and store it in +store+. +func (serv *Wrapper) Component(path string, store *config.Store) error { + serv.wrapAndWait(func(conf *config.Configuration) { + //rev := values.Get("rev") + s := conf.GetComponent(path, "") + *store = s + }) return nil } -func (serv *Wrapper) Workspace(num string, workspace *string) error { - serv.channel <- func(conf *config.Configuration) { - w := conf.GetWorkspace(num) - *workspace = w - } +// Get up to +limit+ workspaces, and store it in +workspaces+ +func (serv *Wrapper) Workspaces(limit int, workspaces *map[string]([]string)) error { + serv.wrapAndWait(func(conf *config.Configuration) { + w := conf.GetWorkspaces(limit) + *workspaces = w + }) return nil } +// Get workspace number +num+ and store it in +workspace+ +func (serv *Wrapper) Workspace(num string, workspace *string) error { + serv.wrapAndWait(func(conf *config.Configuration) { + w := conf.GetWorkspace(num) + *workspace = w + }) + return nil +} + +// Replace a component. +args+[0] must contain the payload (the JSON +// data that will replace the current value), and +args+[1:] must contain +// the path to the component. Parsing errors are swallowed, and if no +// component already exists, this functions as an insert. func (serv *Wrapper) ReplaceComponent(args []string, _ *string) error { - serv.channel <- func(conf *config.Configuration) { + serv.wrapAndWait(func(conf *config.Configuration) { body := args[0] path := args[1:] component := strings.TrimRight(strings.Join(path, "/"), "/") var config config.Store json.Unmarshal([]byte(body), &config) conf.PutComponent(component, config) - } + }) return nil } +// Store the workspace to disk, optionally tagging the commit with +// the name +tag+ (+tag+ may be "") func (serv *Wrapper) CommitWorkspace(tag string, _ *string) error { - serv.channel <- func(conf *config.Configuration) { + serv.wrapAndWait(func(conf *config.Configuration) { conf.PutWorkspace(tag) - } + }) return nil } +// Branch off a new workspace. +args+[0] must be the parent revision, +// and +args+[1] must be the branch name. func (serv *Wrapper) BranchWorkspace(args []string, _ *string) error { - serv.channel <- func(conf *config.Configuration) { + serv.wrapAndWait(func(conf *config.Configuration) { parent_rev := args[0] branch_name := args[1] conf.Branch(parent_rev, branch_name) - } + }) return nil } +// Remove a component from the workspace. +path+ is the name of the +// component to remove. If the component does not exist, NOP. func (serv *Wrapper) RemoveComponent(path []string, _ *string) error { - serv.channel <- func(conf *config.Configuration) { + serv.wrapAndWait(func(conf *config.Configuration) { component := strings.TrimRight(strings.Join(path, "/"), "/") conf.Delete(component) - } + }) return nil } +// Set a component attribute value. +// +args+[0] is the path to the component. +// +args+[1] is the attribute name +// +args+[2] is the new attribute value func (serv *Wrapper) Set(args []string, ok *bool) error { - serv.channel <- func(conf *config.Configuration) { + serv.wrapAndWait(func(conf *config.Configuration) { path := args[0] attribute := args[1] value := args[2] @@ -153,16 +204,19 @@ store := conf.GetComponent(path, "") store[attribute] = obj conf.PutComponent(path, store) - } + }) return nil } +// Get a single attribute from a component +// +args+[0] is the path to the component +// +args+[1] is the attribute name func (serv *Wrapper) Get(args []string, value *interface{}) error { - serv.channel <- func(conf *config.Configuration) { + serv.wrapAndWait(func(conf *config.Configuration) { path := args[0] attribute := args[1] store := conf.GetComponent(path, "") *value = store[attribute] - } + }) return nil }