# HG changeset patch # User sqwishy # Date 1385537112 28800 # Tue Nov 26 23:25:12 2013 -0800 # Node ID 89a54ca22f10f4108af6c52479d1be2cef5bcfe5 # Parent e949661d1885eab33ca1163c9e047de4555a8fd3 Rewrote fs event interaction code diff --git a/goquitur.go b/goquitur.go deleted file mode 100644 --- a/goquitur.go +++ /dev/null @@ -1,87 +0,0 @@ -package main - -import "encoding/json" -import "fmt" -import "os" -import "io" -import "io/ioutil" - -const ( - EventCreate = "create" - EventModify = "modify" - EventDelete = "delete" -) - -var ( - ResourceNotFound = fmt.Errorf("Required resource was not found") -) - -type Request struct { - Subscribe bool - Path string -} - -type Error struct { - Message string - Details string -} - -type State struct { - Event string `json:",omitempty"` - Path string - Payload interface{} -} - -type DelayedCachedReader struct { - File io.Reader - IsLoaded bool - data []byte -} - -func NewDelayedCachedReader(r io.Reader) *DelayedCachedReader { - return &DelayedCachedReader{r, false, nil} -} - -// This is nasty -func (r *DelayedCachedReader) MarshalJSON() ([]byte, error) { - if r.IsLoaded == false { - data, err := ioutil.ReadAll(r.File) - if err != nil { - return nil, err - } - data, err = json.Marshal(data) - if err != nil { - return nil, err - } - r.IsLoaded = true - r.data = data - } - return r.data, nil -} - -type Resource struct { - WebPath string - FSPath string -} - -type OpenResource struct { - *Resource - *os.File - os.FileInfo -} - -func (r *OpenResource) Payload() (interface{}, error) { - if r.IsDir() { - fis, err := r.Readdir(0) - if err != nil { - return nil, err - } - payload := make([]string, len(fis)) - for i, fi := range fis { - payload[i] = fi.Name() - } - return payload, nil - } else { - return NewDelayedCachedReader(r), nil - } -} diff --git a/resources.go b/resources.go new file mode 100644 --- /dev/null +++ b/resources.go @@ -0,0 +1,76 @@ +/* Includes stuff for dealing with resources + +A resource is an object that corresponds to something on the filesystem and +is provided to clients by the service; they are generally either files or +directories. +*/ + +package main + +import "fmt" +import "os" +import "io/ioutil" + +const ( + EventCreate = "create" + EventModify = "modify" + EventDelete = "delete" +) + +var ( + InvalidPath = fmt.Errorf("Invalid path") + ResourceNotFound = fmt.Errorf("Required resource was not found") +) + +type Request struct { + Subscribe bool + Path string +} + +type Error struct { + Message string + Details string +} + +type Message struct { + Event string `json:",omitempty"` + //URL string <- todo, do this + Path string + Payload interface{} `json:",omitempty"` +} + +type Location struct { + WebPath string + FSPath string +} + +type OpenResource struct { + *Location + *os.File + os.FileInfo +} + +func getPayload(path string) (interface{}, error) { + f, err := os.Open(path) + if err != nil { + return nil, err + } + stat, err := f.Stat() + if err != nil { + return nil, err + } + + if stat.IsDir() { + dirnames, err := f.Readdirnames(0) + if err != nil { + return nil, err + } + return dirnames, nil + } else { + data, err := ioutil.ReadAll(f) + if err != nil { + return nil, err + } + return string(data), nil + } +} diff --git a/server.go b/server.go --- a/server.go +++ b/server.go @@ -1,20 +1,28 @@ package main import "net/http" -import "os" +import _"os" import "log" -import "fmt" -import "strings" +import "sync" +import _"fmt" +import _"strings" import "path/filepath" - +import "encoding/json" import "code.google.com/p/go.net/websocket" import "github.com/howeyc/fsnotify" +// It shall be simple to: +// find clients given a resource +// remove a cilent from all resources +// add a client to one resource + type GoquiturHandler struct { *http.ServeMux watchdir string watcher *fsnotify.Watcher - subs map[string][]*websocket.Conn + maplock *sync.Mutex + urlmap map[string][]*websocket.Conn + submap map[*websocket.Conn][]string } func NewGoquiturHandler(watchdir string) (*GoquiturHandler, error) { @@ -22,6 +30,7 @@ if err != nil { return nil, err } + watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err @@ -34,14 +43,156 @@ ServeMux: http.NewServeMux(), watchdir: watchdir, watcher: watcher, - subs: make(map[string][]*websocket.Conn), + maplock: new(sync.Mutex), + urlmap: make(map[string][]*websocket.Conn), + submap: make(map[*websocket.Conn][]string), } h.ServeMux.Handle("/", http.FileServer(http.Dir("static"))) - h.ServeMux.Handle("/ws", websocket.Handler(h.serveWS)) - go h.handleFsEvents() + //h.ServeMux.Handle("/ws", websocket.Handler(h.serveWS)) + + evchan, errchan := processWatcher(h.watcher) + + go h.handleEvents(evchan, errchan) + return h, nil } +func (h *GoquiturHandler) pathToLocation(path string) (*Location, error) { + webpath, err := filepath.Rel(h.watchdir, path) + if err != nil { + return nil, err + } + return &Location{webpath, path}, nil +} + +func (h *GoquiturHandler) Subscribers(path string) []*websocket.Conn { + h.maplock.Lock() + defer h.maplock.Unlock() + if v, ok := h.urlmap[path]; ok == true { + return v + } + return make([]*websocket.Conn, 0) +} + +func (h *GoquiturHandler) handleEvents(evchan <-chan *Event, errchan <-chan error) { + for { + select { + case ev := <-evchan: + log.Println("Got fs event!", ev) + + loc, err := h.pathToLocation(ev.Path) + if err != nil { + log.Println("Error handling fs event:", err) + continue + } + log.Println("event at", loc) + + switch ev.Type { + case EventCreate: + fallthrough + case EventModify: + fallthrough + case EventDelete: + var data []byte + for _, sub := range h.Subscribers(loc.WebPath) { + if data == nil { + // If we can't produce a valid value for data we can't + // handle this event at all, so we break and move on + // todo, set msg to some error thing and push that to + // clients + payload, err := getPayload(loc.FSPath) + if err != nil { + log.Println("Error getting message payload:", err) + break + } + msg := &Message{ + Event: ev.Type, + Path: loc.WebPath, + Payload: payload, + } + data, err = json.Marshal(msg) + if err != nil { + log.Println("Error encoding message:", err) + break + } + } + go func() { + _, err = sub.Write(data) + if err != nil { + log.Println("Error writing to a client:", err) + // todo, remove subscriber? + } + }() + } + default: + log.Println("Unknown event, ignoring:", ev.Type) + } + + case err := <-errchan: + log.Println("Error handling fs event:", err) + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +/* +func (h *GoquiturHandler) resAdded(res *Resource) error { + ores, err := h.openResource(res) + if err != nil { + return err + } + payload, err := ores.Payload() + if err != nil { + return err + } + h.notifyConns(res.WebPath, &State{ + Event: EventCreate, + Path: res.WebPath, + Payload: payload, + }) + if res.WebPath != "/" { + parent, err := h.resFromWebPath(filepath.Dir(res.WebPath)) + if err != nil { + return err + } + return h.resModified(parent) + } + return nil +} + +func (h *GoquiturHandler) resModified(res *Resource) error { + ores, err := h.openResource(res) + if err != nil { + return err + } + payload, err := ores.Payload() + if err != nil { + return err + } + h.notifyConns(res.WebPath, &State{ + Event: EventModify, + Path: res.WebPath, + Payload: payload, + }) + return nil +} + +func (h *GoquiturHandler) resRemoved(res *Resource) error { + h.notifyConns(res.WebPath, &State{ + Event: EventDelete, + Path: res.WebPath, + }) + if res.WebPath != "/" { + parent, err := h.resFromWebPath(filepath.Dir(res.WebPath)) + if err != nil { + return err + } + return h.resModified(parent) + } + return nil +} + +//////////////////////////////////////////////////////////////////////////////// func (h *GoquiturHandler) resFromWebPath(path string) (*Resource, error) { if path == "" { return nil, fmt.Errorf("Path must not be empty") @@ -108,7 +259,6 @@ Details: err.Error(), }) } - // Open the resource ores, err := h.openResource(res) if err != nil { @@ -140,35 +290,6 @@ }) } -func (h *GoquiturHandler) handleFsEvents() { - for { - log.Println("waiting for fs events") - select { - case ev := <-h.watcher.Event: - var err error - path := ev.Name[len(h.watchdir):] - log.Println("event:", ev, path) - res, err := h.resFromWebPath(path) - if err != nil { - log.Println("error resolving resource in handling fs event:", err) - } else { - if ev.IsDelete() || ev.IsRename() { - err = h.resRemoved(res) - } else if ev.IsModify() { - err = h.resModified(res) - } else if ev.IsCreate() { - err = h.resAdded(res) - } - if err != nil { - log.Println("error dealing with resource:", err) - } - } - case err := <-h.watcher.Error: - log.Println("error:", err) - } - } -} - func notifyConn(conn *websocket.Conn, msg interface{}) error { log.Printf("notifying %v with %+v", conn, msg) return websocket.JSON.Send(conn, msg) @@ -186,59 +307,4 @@ } } } - -func (h *GoquiturHandler) resAdded(res *Resource) error { - ores, err := h.openResource(res) - if err != nil { - return err - } - payload, err := ores.Payload() - if err != nil { - return err - } - h.notifyConns(res.WebPath, &State{ - Event: EventCreate, - Path: res.WebPath, - Payload: payload, - }) - if res.WebPath != "/" { - parent, err := h.resFromWebPath(filepath.Dir(res.WebPath)) - if err != nil { - return err - } - return h.resModified(parent) - } - return nil -} - -func (h *GoquiturHandler) resModified(res *Resource) error { - ores, err := h.openResource(res) - if err != nil { - return err - } - payload, err := ores.Payload() - if err != nil { - return err - } - h.notifyConns(res.WebPath, &State{ - Event: EventModify, - Path: res.WebPath, - Payload: payload, - }) - return nil -} - -func (h *GoquiturHandler) resRemoved(res *Resource) error { - h.notifyConns(res.WebPath, &State{ - Event: EventDelete, - Path: res.WebPath, - }) - if res.WebPath != "/" { - parent, err := h.resFromWebPath(filepath.Dir(res.WebPath)) - if err != nil { - return err - } - return h.resModified(parent) - } - return nil -} +*/ diff --git a/watch.go b/watch.go new file mode 100644 --- /dev/null +++ b/watch.go @@ -0,0 +1,51 @@ +package main + +import "log" +import "path/filepath" +import "github.com/howeyc/fsnotify" + +type Event struct { + Type string + Path string +} + +func processLoop(w *fsnotify.Watcher, evchan chan<- *Event, errchan chan<- error) { + log.Println("Waiting for fs events") + for { + select { + case fsev := <-w.Event: + log.Println("event:", fsev) + var err error + + path, err := filepath.Abs(fsev.Name) + if err != nil { + errchan <- err + continue + } + + ev := new(Event) + ev.Path = path + if fsev.IsDelete() || fsev.IsRename() { + ev.Type = EventDelete + } else if fsev.IsModify() { + ev.Type = EventModify + } else if fsev.IsCreate() { + ev.Type = EventCreate + } else { + log.Println("unhandled event, don't know what to do with event of this type") + continue + } + evchan <- ev + + case err := <-w.Error: + errchan <- err + } + } +} + +func processWatcher(w *fsnotify.Watcher) (<-chan *Event, <-chan error) { + evchan := make(chan *Event, 20) + errchan := make(chan error, 20) + go processLoop(w, evchan, errchan) + return evchan, errchan +}