# HG changeset patch # User Sean Russell # Date 1327975942 18000 # Mon Jan 30 21:12:22 2012 -0500 # Branch weekly # Node ID 8930acb4ed38fb56ac4fe0e3e7011c2aaa1dc07e # Parent e3d4fa27e4973134cae404304df6ab2406a618f7 Refactored common code into a library. diff --git a/restserver/main.go b/restserver/main.go --- a/restserver/main.go +++ b/restserver/main.go @@ -1,7 +1,7 @@ package main import ( - "code.google.com/p/gorest/gorest" + "gorest.googlecode.com/hg/gorest" "config" "encoding/json" "flag" @@ -11,6 +11,8 @@ "net/http" "os" "strings" + + "server" ) /////////////////////////////////////////////////////////////////////////////// @@ -46,19 +48,7 @@ } service := new(ConfigService) - service.channel = make(chan func(*config.Configuration)) - service.exit = make(chan bool) - go func() { - config := initConfig(*repo_dir) - for { - select { - case message := <-service.channel: - message(config) - case <-service.exit: - return - } - } - }() + service.wrapper = server.Start(*repo_dir) gorest.RegisterService(service) @@ -90,7 +80,7 @@ componentIds gorest.EndPoint `method:"GET" path:"/items/" output:"[]string"` component gorest.EndPoint `method:"GET" path:"/item/{...:string}" output:"Store"` workspaces gorest.EndPoint `method:"GET" path:"/workspaces/?{limit:int}" output:"Map"` - workspace gorest.EndPoint `method:"GET" path:"/workspace/" output:"string"` + workspace gorest.EndPoint `method:"GET" path:"/workspace/{Id:string}" output:"string"` exitCommand gorest.EndPoint `method:"GET" path:"/exit" output:"int"` replaceComponent gorest.EndPoint `method:"PUT" path:"/item/{...:string}" postdata:"string"` @@ -101,79 +91,51 @@ removeComponent gorest.EndPoint `method:"DELETE" path:"/item/{...:string}"` - channel chan func(*config.Configuration) - exit chan bool + wrapper *server.Server } -func (serv ConfigService) ComponentIds() []string { - var retVal *[]string - serv.channel <- func(conf *config.Configuration) { - *retVal = conf.GetComponents() - } - return *retVal +func (serv ConfigService) ComponentIds() (ids []string) { + ids = serv.ComponentIds() + return } func (serv ConfigService) Component(path ...string) config.Store { - var retVal *config.Store - serv.channel <- func(conf *config.Configuration) { - //rev := values.Get("rev") - component := strings.TrimRight(strings.Join(path, "/"), "/") - *retVal = conf.GetComponent(component, "") - } - return *retVal + // Check if it's a request for an attribute + component := strings.TrimRight(strings.Join(path, "/"), "/") + return *serv.wrapper.Component(component) } func (serv ConfigService) Workspaces(limit int) Map { - var retVal *Map - serv.channel <- func(conf *config.Configuration) { - *retVal = conf.GetWorkspaces(limit) - } - return *retVal + return *serv.wrapper.Workspaces(limit) } func (serv ConfigService) Workspace() string { - var retVal *string - serv.channel <- func(conf *config.Configuration) { - *retVal = conf.GetWorkspace("0") - } - return *retVal + return *serv.wrapper.Workspace("0") } func (serv ConfigService) ReplaceComponent(body string, path ...string) { - serv.channel <- func(conf *config.Configuration) { - component := strings.TrimRight(strings.Join(path, "/"), "/") - var store config.Store - json.Unmarshal([]byte(body), &store) - conf.PutComponent(component, store) - } + component := strings.TrimRight(strings.Join(path, "/"), "/") + serv.wrapper.ReplaceComponent(body, component) } -func (serv ConfigService) CommitWorkspace(_ string) { - serv.channel <- func(conf *config.Configuration) { - conf.PutWorkspace("") - } +func (serv ConfigService) CommitWorkspace(tag string) { + serv.wrapper.CommitWorkspace("") } func (serv ConfigService) CommitTagWorkspace(_ string, tag string) { - serv.channel <- func(conf *config.Configuration) { - conf.PutWorkspace(tag) - } + serv.wrapper.CommitWorkspace(tag) } func (serv ConfigService) BranchWorkspace(_ string, parent_rev string, branch_name string) { - serv.channel <- func(conf *config.Configuration) { - conf.Branch(parent_rev, branch_name) - } + serv.wrapper.BranchWorkspace(parent_rev, branch_name) } func (serv ConfigService) RemoveComponent(path ...string) { - serv.channel <- func(conf *config.Configuration) { - component := strings.TrimRight(strings.Join(path, "/"), "/") - conf.Delete(component) - } + component := strings.TrimRight(strings.Join(path, "/"), "/") + serv.wrapper.RemoveComponent(component) } func (serv ConfigService) ExitCommand() int { - serv.exit <- true + serv.wrapper.Stop() return 0 } diff --git a/server/main.go b/server/main.go --- a/server/main.go +++ b/server/main.go @@ -5,76 +5,31 @@ "encoding/json" "flag" "log" - "net" "net/rpc" "net/rpc/jsonrpc" "strconv" "strings" + + "server" ) /////////////////////////////////////////////////////////////////////////////// // RPC server //////////////////////////////////////////////////////////////// -// This code needs to ensure that access to the database is sequential, so -// there are some structures and functions that are used to enforce this. - -// Requests to access the DB come over this structure in the form of a -// function to execute; this gets queued up, and when the function's -// turn arrives, it gets called with a configuration to operate on. -// When the function is complete (returns), a message is sent on the -// +response+ channel. One of these is created per incoming request. -type Callback struct { - // The function to execute in turn - toExec func(*config.Configuration) - // When +toExec+ is complete, a message will be sent on this channel - response chan bool -} - -// This structure represents the server -- when requests come in, they're -// synchronized over this +channel+. There is one of these for the entire -// server. -type Wrapper struct { - // Incoming messages from the network - channel chan Callback - // A message is sent on this when the server wants to exit - exit chan bool -} - func main() { var port *int = flag.Int("p", 8125, "Listen port (8125)") var repo_dir *string = flag.String("d", "configserver.db", "Name of the config DB directory (configserver.db") flag.Parse() - RunServer(*repo_dir, *port, nil) + server.RunServer(*repo_dir, *port, nil) } func RunServer(repo_dir string, port int, ready chan error) { addr := ":" + strconv.Itoa(port) - var wrap Wrapper - wrap.channel = make(chan Callback) - wrap.exit = make(chan bool) - - go func() { - hg := new(config.Hg) - hg.Init(repo_dir) - var storeMap config.StoreMap - conf := new(config.Configuration) - json.Unmarshal([]byte(hg.Get("")), &storeMap) - conf.Workspace = storeMap - conf.Repo = hg - - for { - select { - case message := <-wrap.channel: - message.toExec(conf) - message.response <- true - case <-wrap.exit: - } - } - }() + wrap := server.Start(repo_dir) log.Printf("Starting server on port %s\n", addr) rpc.Register(&wrap) @@ -89,134 +44,54 @@ } } -// Utility function to handle the synchronization of function execution. -// Queues the +fun+ function, and waits for the completion message. -// Does not return until the function is finished executing. -func (serv *Wrapper) wrapAndWait(fun func(conf *config.Configuration)) { - var callback Callback - callback.toExec = fun - callback.response = make(chan bool) - serv.channel <- callback - <-callback.response -} - -// Get the list of component ids and store it in +ids+ func (serv *Wrapper) ComponentIds(_ string, ids *[]string) error { - serv.wrapAndWait(func(conf *config.Configuration) { - c := conf.GetComponents() - *ids = c - }) - + *ids = serv.server.ComponentIds() return nil } -// Fetch the contents of the component at +path+, and store it in +store+. func (serv *Wrapper) Component(path string, store *config.Store) error { - serv.wrapAndWait(func(conf *config.Configuration) { - //rev := values.Get("rev") - s := conf.GetComponent(path, "") - *store = s - }) + *store = serv.server.Component(path) return nil } -// Get up to +limit+ workspaces, and store it in +workspaces+ func (serv *Wrapper) Workspaces(limit int, workspaces *map[string]([]string)) error { - serv.wrapAndWait(func(conf *config.Configuration) { - w := conf.GetWorkspaces(limit) - *workspaces = w - }) + *workspaces = serv.server.Workspaces(limit) return nil } -// Get workspace number +num+ and store it in +workspace+ func (serv *Wrapper) Workspace(num string, workspace *string) error { - serv.wrapAndWait(func(conf *config.Configuration) { - w := conf.GetWorkspace(num) - *workspace = w - }) + *workspace = serv.server.Worspace(num) return nil } -// Replace a component. +args+[0] must contain the payload (the JSON -// data that will replace the current value), and +args+[1:] must contain -// the path to the component. Parsing errors are swallowed, and if no -// component already exists, this functions as an insert. func (serv *Wrapper) ReplaceComponent(args []string, _ *string) error { - serv.wrapAndWait(func(conf *config.Configuration) { - body := args[0] - path := args[1:] - component := strings.TrimRight(strings.Join(path, "/"), "/") - var config config.Store - json.Unmarshal([]byte(body), &config) - conf.PutComponent(component, config) - }) + component := strings.TrimRight(strings.Join(path[1:], "/"), "/") + serv.server.ReplaceComponent(args[0], component) return nil } -// Store the workspace to disk, optionally tagging the commit with -// the name +tag+ (+tag+ may be "") func (serv *Wrapper) CommitWorkspace(tag string, _ *string) error { - serv.wrapAndWait(func(conf *config.Configuration) { - conf.PutWorkspace(tag) - }) + serv.server.CommitWorkspace(tag) return nil } -// Branch off a new workspace. +args+[0] must be the parent revision, -// and +args+[1] must be the branch name. func (serv *Wrapper) BranchWorkspace(args []string, _ *string) error { - serv.wrapAndWait(func(conf *config.Configuration) { - parent_rev := args[0] - branch_name := args[1] - conf.Branch(parent_rev, branch_name) - }) - return nil -} - -// Remove a component from the workspace. +path+ is the name of the -// component to remove. If the component does not exist, NOP. -func (serv *Wrapper) RemoveComponent(path []string, _ *string) error { - serv.wrapAndWait(func(conf *config.Configuration) { - component := strings.TrimRight(strings.Join(path, "/"), "/") - conf.Delete(component) - }) + serv.server.BranchWorkspace(args[0], args[1]) return nil } -// Set a component attribute value. -// +args+[0] is the path to the component. -// +args+[1] is the attribute name -// +args+[2] is the new attribute value -func (serv *Wrapper) Set(args []string, ok *bool) error { - serv.wrapAndWait(func(conf *config.Configuration) { - path := args[0] - attribute := args[1] - value := args[2] - if value[0] != "["[0] && value[0] != "{"[0] { - value = `"` + value + `"` - } - log.Printf("attr = %v, value = %v", attribute, value) - log.Printf("uint8 = %v", []byte(value)) - var obj interface{} - json.Unmarshal([]byte(value), &obj) - log.Printf("obj = %v", obj) - store := conf.GetComponent(path, "") - store[attribute] = obj - conf.PutComponent(path, store) - }) +func (serv *Wrapper) RemoveComponent(path []string, _ *string) error { + component := strings.TrimRight(strings.Join(path, "/"), "/") + serv.server.RemoveComponent(component) return nil } -// Get a single attribute from a component -// +args+[0] is the path to the component -// +args+[1] is the attribute name -func (serv *Wrapper) Get(args []string, value *interface{}) error { - serv.wrapAndWait(func(conf *config.Configuration) { - path := args[0] - attribute := args[1] - store := conf.GetComponent(path, "") - *value = store[attribute] - }) +func (serv *Wrapper) Set(args []string, ok *bool) error { + *ok = serv.server.Set(args[0], args[1], args[2]) return nil } + +func (serv *Wrapper) Get(args []string, value *interface{}) error { + *value = serv.server.Get(args[0], args[1]) + return nil +} diff --git a/server/server.go b/server/server.go new file mode 100644 --- /dev/null +++ b/server/server.go @@ -0,0 +1,173 @@ +package server + +import ( + "config" + + "encoding/json" + "log" +) + +// This code needs to ensure that access to the database is sequential, so +// there are some structures and functions that are used to enforce this. + +// Requests to access the DB come over this structure in the form of a +// function to execute; this gets queued up, and when the function's +// turn arrives, it gets called with a configuration to operate on. +// When the function is complete (returns), a message is sent on the +// +response+ channel. One of these is created per incoming request. +type Callback struct { + // The function to execute in turn + toExec func(*config.Configuration) + // When +toExec+ is complete, a message will be sent on this channel + response chan bool +} + +// This structure represents the server -- when requests come in, they're +// synchronized over this +channel+. There is one of these for the entire +// server. +type Server struct { + // Incoming messages from the network + Channel chan Callback + // A message is sent on this when the server wants to exit + exit chan bool +} + +func Start(repo_dir string) *Server { + wrap := new(Server) + wrap.Channel = make(chan Callback) + wrap.exit = make(chan bool) + go func() { + hg := new(config.Hg) + hg.Init(repo_dir) + var storeMap config.StoreMap + conf := new(config.Configuration) + json.Unmarshal([]byte(hg.Get("")), &storeMap) + conf.Workspace = storeMap + conf.Repo = hg + + for { + select { + case message := <-wrap.Channel: + message.toExec(conf) + message.response <- true + case <-wrap.exit: + } + } + }() + return wrap +} + +func (serv *Server) Stop() { + serv.exit <- true +} + +// Utility function to handle the synchronization of function execution. +// Queues the +fun+ function, and waits for the completion message. +// Does not return until the function is finished executing. +func (serv *Server) wrapAndWait(fun func(conf *config.Configuration)) { + var callback Callback + callback.toExec = fun + callback.response = make(chan bool) + serv.Channel <- callback + <-callback.response +} + +// Get the list of component ids and store it in +ids+ +func (serv *Server) ComponentIds() (ids *[]string) { + serv.wrapAndWait(func(conf *config.Configuration) { + c := conf.GetComponents() + *ids = c + }) + return +} + +// Fetch the contents of the component at +path+, and store it in +store+. +func (serv *Server) Component(path string) (store *config.Store) { + serv.wrapAndWait(func(conf *config.Configuration) { + //rev := values.Get("rev") + s := conf.GetComponent(path, "") + *store = s + }) + return +} + +// Get up to +limit+ workspaces, and store it in +workspaces+ +func (serv *Server) Workspaces(limit int) (workspaces *map[string]([]string)) { + serv.wrapAndWait(func(conf *config.Configuration) { + w := conf.GetWorkspaces(limit) + *workspaces = w + }) + return +} + +// Get workspace number +num+ and store it in +workspace+ +func (serv *Server) Workspace(num string) (workspace *string) { + serv.wrapAndWait(func(conf *config.Configuration) { + w := conf.GetWorkspace(num) + *workspace = w + }) + return +} + +// Replace a component. +args+[0] must contain the payload (the JSON +// data that will replace the current value), and +args+[1:] must contain +// the path to the component. Parsing errors are swallowed, and if no +// component already exists, this functions as an insert. +func (serv *Server) ReplaceComponent(body string, path string) { + serv.wrapAndWait(func(conf *config.Configuration) { + var config config.Store + json.Unmarshal([]byte(body), &config) + conf.PutComponent(path, config) + }) +} + +// Store the workspace to disk, optionally tagging the commit with +// the name +tag+ (+tag+ may be "") +func (serv *Server) CommitWorkspace(tag string) { + serv.wrapAndWait(func(conf *config.Configuration) { + conf.PutWorkspace(tag) + }) +} + +// Branch off a new workspace. +args+[0] must be the parent revision, +// and +args+[1] must be the branch name. +func (serv *Server) BranchWorkspace(parent_rev, branch_name string) { + serv.wrapAndWait(func(conf *config.Configuration) { + conf.Branch(parent_rev, branch_name) + }) +} + +// Remove a component from the workspace. +path+ is the name of the +// component to remove. If the component does not exist, NOP. +func (serv *Server) RemoveComponent(component string) { + serv.wrapAndWait(func(conf *config.Configuration) { + conf.Delete(component) + }) +} + +// Set a component attribute value. +func (serv *Server) Set(path, attribute, value string) (ok *bool) { + serv.wrapAndWait(func(conf *config.Configuration) { + if value[0] != "["[0] && value[0] != "{"[0] { + value = `"` + value + `"` + } + log.Printf("attr = %v, value = %v", attribute, value) + log.Printf("uint8 = %v", []byte(value)) + var obj interface{} + json.Unmarshal([]byte(value), &obj) + log.Printf("obj = %v", obj) + store := conf.GetComponent(path, "") + store[attribute] = obj + conf.PutComponent(path, store) + }) + return +} + +// Get a single attribute from a component +func (serv *Server) Get(path, attribute string) (value *interface{}) { + serv.wrapAndWait(func(conf *config.Configuration) { + store := conf.GetComponent(path, "") + *value = store[attribute] + }) + return +}