Rewrote fs event interaction code
4 files changed, 287 insertions(+), 181 deletions(-)

R goquitur.go => 
A => resources.go
M server.go
A => watch.go
R goquitur.go =>  +0 -87
@@ 1,87 0,0 @@ 
-package main
-
-import "encoding/json"
-import "fmt"
-import "os"
-import "io"
-import "io/ioutil"
-
-const (
-    EventCreate         = "create"
-    EventModify         = "modify"
-    EventDelete         = "delete"
-)
-
-var (
-    ResourceNotFound    = fmt.Errorf("Required resource was not found")
-)
-
-type Request struct {
-    Subscribe       bool
-    Path            string
-}
-
-type Error struct {
-    Message         string
-    Details         string
-}
-
-type State struct {
-    Event           string      `json:",omitempty"`
-    Path            string
-    Payload         interface{}
-}
-
-type DelayedCachedReader struct {
-    File            io.Reader
-    IsLoaded        bool
-    data            []byte
-}
-
-func NewDelayedCachedReader(r io.Reader) *DelayedCachedReader {
-    return &DelayedCachedReader{r, false, nil}
-}
-
-// This is nasty
-func (r *DelayedCachedReader) MarshalJSON() ([]byte, error) {
-    if r.IsLoaded == false {
-        data, err := ioutil.ReadAll(r.File)
-        if err != nil {
-            return nil, err
-        }
-        data, err = json.Marshal(data)
-        if err != nil {
-            return nil, err
-        }
-        r.IsLoaded = true
-        r.data = data
-    }
-    return r.data, nil
-}
-
-type Resource struct {
-    WebPath         string
-    FSPath          string
-}
-
-type OpenResource struct {
-    *Resource
-    *os.File
-    os.FileInfo
-}
-
-func (r *OpenResource) Payload() (interface{}, error) {
-    if r.IsDir() {
-        fis, err := r.Readdir(0)
-        if err != nil {
-            return nil, err
-        }
-        payload := make([]string, len(fis))
-        for i, fi := range fis {
-            payload[i] = fi.Name()
-        }
-        return payload, nil
-    } else {
-        return NewDelayedCachedReader(r), nil
-    }
-}

          
A => resources.go +76 -0
@@ 0,0 1,76 @@ 
+/* Includes stuff for dealing with resources
+
+A resource is an object that corresponds to something on the filesystem and
+is provided to clients by the service; they are generally either files or
+directories.
+*/
+
+package main
+
+import "fmt"
+import "os"
+import "io/ioutil"
+
+const (
+    EventCreate         = "create"
+    EventModify         = "modify"
+    EventDelete         = "delete"
+)
+
+var (
+    InvalidPath         = fmt.Errorf("Invalid path")
+    ResourceNotFound    = fmt.Errorf("Required resource was not found")
+)
+
+type Request struct {
+    Subscribe       bool
+    Path            string
+}
+
+type Error struct {
+    Message         string
+    Details         string
+}
+
+type Message struct {
+    Event       string      `json:",omitempty"`
+    //URL         string <- todo, do this
+    Path        string
+    Payload     interface{} `json:",omitempty"`
+}
+
+type Location struct {
+    WebPath         string
+    FSPath          string
+}
+
+type OpenResource struct {
+    *Location
+    *os.File
+    os.FileInfo
+}
+
+func getPayload(path string) (interface{}, error) {
+    f, err := os.Open(path)
+    if err != nil {
+        return nil, err
+    }
+    stat, err := f.Stat()
+    if err != nil {
+        return nil, err
+    }
+
+    if stat.IsDir() {
+        dirnames, err := f.Readdirnames(0)
+        if err != nil {
+            return nil, err
+        }
+        return dirnames, nil
+    } else {
+        data, err := ioutil.ReadAll(f)
+        if err != nil {
+            return nil, err
+        }
+        return string(data), nil
+    }
+}

          
M server.go +160 -94
@@ 1,20 1,28 @@ 
 package main
 
 import "net/http"
-import "os"
+import _"os"
 import "log"
-import "fmt"
-import "strings"
+import "sync"
+import _"fmt"
+import _"strings"
 import "path/filepath"
-
+import "encoding/json"
 import "code.google.com/p/go.net/websocket"
 import "github.com/howeyc/fsnotify"
 
+// It shall be simple to:
+//   find clients given a resource
+//   remove a cilent from all resources
+//   add a client to one resource
+
 type GoquiturHandler struct {
     *http.ServeMux
     watchdir        string
     watcher         *fsnotify.Watcher
-    subs            map[string][]*websocket.Conn
+    maplock         *sync.Mutex
+    urlmap          map[string][]*websocket.Conn
+    submap          map[*websocket.Conn][]string
 }
 
 func NewGoquiturHandler(watchdir string) (*GoquiturHandler, error) {

          
@@ 22,6 30,7 @@ func NewGoquiturHandler(watchdir string)
     if err != nil {
         return nil, err
     }
+
     watcher, err := fsnotify.NewWatcher()
     if err != nil {
         return nil, err

          
@@ 34,14 43,156 @@ func NewGoquiturHandler(watchdir string)
         ServeMux: http.NewServeMux(),
         watchdir: watchdir,
         watcher: watcher,
-        subs: make(map[string][]*websocket.Conn),
+        maplock: new(sync.Mutex),
+        urlmap: make(map[string][]*websocket.Conn),
+        submap: make(map[*websocket.Conn][]string),
     }
     h.ServeMux.Handle("/", http.FileServer(http.Dir("static")))
-    h.ServeMux.Handle("/ws", websocket.Handler(h.serveWS))
-    go h.handleFsEvents()
+    //h.ServeMux.Handle("/ws", websocket.Handler(h.serveWS))
+
+    evchan, errchan := processWatcher(h.watcher)
+
+    go h.handleEvents(evchan, errchan)
+
     return h, nil
 }
 
+func (h *GoquiturHandler) pathToLocation(path string) (*Location, error) {
+    webpath, err := filepath.Rel(h.watchdir, path)
+    if err != nil {
+        return nil, err
+    }
+    return &Location{webpath, path}, nil
+}
+
+func (h *GoquiturHandler) Subscribers(path string) []*websocket.Conn {
+    h.maplock.Lock()
+    defer h.maplock.Unlock()
+    if v, ok := h.urlmap[path]; ok == true {
+        return v
+    }
+    return make([]*websocket.Conn, 0)
+}
+
+func (h *GoquiturHandler) handleEvents(evchan <-chan *Event, errchan <-chan error) {
+    for {
+        select {
+        case ev := <-evchan:
+            log.Println("Got fs event!", ev)
+
+            loc, err := h.pathToLocation(ev.Path)
+            if err != nil {
+                log.Println("Error handling fs event:", err)
+                continue
+            }
+            log.Println("event at", loc)
+
+            switch ev.Type {
+            case EventCreate:
+                fallthrough
+            case EventModify:
+                fallthrough
+            case EventDelete:
+                var data []byte
+                for _, sub := range h.Subscribers(loc.WebPath) {
+                    if data == nil {
+                        // If we can't produce a valid value for data we can't
+                        // handle this event at all, so we break and move on
+                        // todo, set msg to some error thing and push that to
+                        // clients
+                        payload, err := getPayload(loc.FSPath)
+                        if err != nil {
+                            log.Println("Error getting message payload:", err)
+                            break
+                        }
+                        msg := &Message{
+                            Event: ev.Type,
+                            Path: loc.WebPath,
+                            Payload: payload,
+                        }
+                        data, err = json.Marshal(msg)
+                        if err != nil {
+                            log.Println("Error encoding message:", err)
+                            break
+                        }
+                    }
+                    go func() {
+                        _, err = sub.Write(data)
+                        if err != nil {
+                            log.Println("Error writing to a client:", err)
+                            // todo, remove subscriber?
+                        }
+                    }()
+                }
+            default:
+                log.Println("Unknown event, ignoring:", ev.Type)
+            }
+
+        case err := <-errchan:
+            log.Println("Error handling fs event:", err)
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*
+func (h *GoquiturHandler) resAdded(res *Resource) error {
+    ores, err := h.openResource(res)
+    if err != nil {
+        return err
+    }
+    payload, err := ores.Payload()
+    if err != nil {
+        return err
+    }
+    h.notifyConns(res.WebPath, &State{
+        Event: EventCreate,
+        Path: res.WebPath,
+        Payload: payload,
+    })
+    if res.WebPath != "/" {
+        parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
+        if err != nil {
+            return err
+        }
+        return h.resModified(parent)
+    }
+    return nil
+}
+
+func (h *GoquiturHandler) resModified(res *Resource) error {
+    ores, err := h.openResource(res)
+    if err != nil {
+        return err
+    }
+    payload, err := ores.Payload()
+    if err != nil {
+        return err
+    }
+    h.notifyConns(res.WebPath, &State{
+        Event: EventModify,
+        Path: res.WebPath,
+        Payload: payload,
+    })
+    return nil
+}
+
+func (h *GoquiturHandler) resRemoved(res *Resource) error {
+    h.notifyConns(res.WebPath, &State{
+        Event: EventDelete,
+        Path: res.WebPath,
+    })
+    if res.WebPath != "/" {
+        parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
+        if err != nil {
+            return err
+        }
+        return h.resModified(parent)
+    }
+    return nil
+}
+
+////////////////////////////////////////////////////////////////////////////////
 func (h *GoquiturHandler) resFromWebPath(path string) (*Resource, error) {
     if path == "" {
         return nil, fmt.Errorf("Path must not be empty")

          
@@ 108,7 259,6 @@ func (h *GoquiturHandler) serveOneWS(ws 
             Details: err.Error(),
         })
     }
-
     // Open the resource
     ores, err := h.openResource(res)
     if err != nil {

          
@@ 140,35 290,6 @@ func (h *GoquiturHandler) serveOneWS(ws 
     })
 }
 
-func (h *GoquiturHandler) handleFsEvents() {
-    for {
-        log.Println("waiting for fs events")
-        select {
-        case ev := <-h.watcher.Event:
-            var err error
-            path := ev.Name[len(h.watchdir):]
-            log.Println("event:", ev, path)
-            res, err := h.resFromWebPath(path)
-            if err != nil {
-                log.Println("error resolving resource in handling fs event:", err)
-            } else {
-                if ev.IsDelete() || ev.IsRename() {
-                    err = h.resRemoved(res)
-                } else if ev.IsModify() {
-                    err = h.resModified(res)
-                } else if ev.IsCreate() {
-                    err = h.resAdded(res)
-                }
-                if err != nil {
-                    log.Println("error dealing with resource:", err)
-                }
-            }
-        case err := <-h.watcher.Error:
-            log.Println("error:", err)
-        }
-    }
-}
-
 func notifyConn(conn *websocket.Conn, msg interface{}) error {
     log.Printf("notifying %v with %+v", conn, msg)
     return websocket.JSON.Send(conn, msg)

          
@@ 186,59 307,4 @@ func (h *GoquiturHandler) notifyConns(pa
         }
     }
 }
-
-func (h *GoquiturHandler) resAdded(res *Resource) error {
-    ores, err := h.openResource(res)
-    if err != nil {
-        return err
-    }
-    payload, err := ores.Payload()
-    if err != nil {
-        return err
-    }
-    h.notifyConns(res.WebPath, &State{
-        Event: EventCreate,
-        Path: res.WebPath,
-        Payload: payload,
-    })
-    if res.WebPath != "/" {
-        parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
-        if err != nil {
-            return err
-        }
-        return h.resModified(parent)
-    }
-    return nil
-}
-
-func (h *GoquiturHandler) resModified(res *Resource) error {
-    ores, err := h.openResource(res)
-    if err != nil {
-        return err
-    }
-    payload, err := ores.Payload()
-    if err != nil {
-        return err
-    }
-    h.notifyConns(res.WebPath, &State{
-        Event: EventModify,
-        Path: res.WebPath,
-        Payload: payload,
-    })
-    return nil
-}
-
-func (h *GoquiturHandler) resRemoved(res *Resource) error {
-    h.notifyConns(res.WebPath, &State{
-        Event: EventDelete,
-        Path: res.WebPath,
-    })
-    if res.WebPath != "/" {
-        parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
-        if err != nil {
-            return err
-        }
-        return h.resModified(parent)
-    }
-    return nil
-}
+*/

          
A => watch.go +51 -0
@@ 0,0 1,51 @@ 
+package main
+
+import "log"
+import "path/filepath"
+import "github.com/howeyc/fsnotify"
+
+type Event struct {
+    Type        string
+    Path        string
+}
+
+func processLoop(w *fsnotify.Watcher, evchan chan<- *Event, errchan chan<- error) {
+    log.Println("Waiting for fs events")
+    for {
+        select {
+        case fsev := <-w.Event:
+            log.Println("event:", fsev)
+            var err error
+
+            path, err := filepath.Abs(fsev.Name)
+            if err != nil {
+                errchan <- err
+                continue
+            }
+
+            ev := new(Event)
+            ev.Path = path
+            if fsev.IsDelete() || fsev.IsRename() {
+                ev.Type = EventDelete
+            } else if fsev.IsModify() {
+                ev.Type = EventModify
+            } else if fsev.IsCreate() {
+                ev.Type = EventCreate
+            } else {
+                log.Println("unhandled event, don't know what to do with event of this type")
+                continue
+            }
+            evchan <- ev
+
+        case err := <-w.Error:
+            errchan <- err
+        }
+    }
+}
+
+func processWatcher(w *fsnotify.Watcher) (<-chan *Event, <-chan error) {
+    evchan := make(chan *Event, 20)
+    errchan := make(chan error, 20)
+    go processLoop(w, evchan, errchan)
+    return evchan, errchan
+}