@@ 1,87 0,0 @@
-package main
-
-import "encoding/json"
-import "fmt"
-import "os"
-import "io"
-import "io/ioutil"
-
-const (
- EventCreate = "create"
- EventModify = "modify"
- EventDelete = "delete"
-)
-
-var (
- ResourceNotFound = fmt.Errorf("Required resource was not found")
-)
-
-type Request struct {
- Subscribe bool
- Path string
-}
-
-type Error struct {
- Message string
- Details string
-}
-
-type State struct {
- Event string `json:",omitempty"`
- Path string
- Payload interface{}
-}
-
-type DelayedCachedReader struct {
- File io.Reader
- IsLoaded bool
- data []byte
-}
-
-func NewDelayedCachedReader(r io.Reader) *DelayedCachedReader {
- return &DelayedCachedReader{r, false, nil}
-}
-
-// This is nasty
-func (r *DelayedCachedReader) MarshalJSON() ([]byte, error) {
- if r.IsLoaded == false {
- data, err := ioutil.ReadAll(r.File)
- if err != nil {
- return nil, err
- }
- data, err = json.Marshal(data)
- if err != nil {
- return nil, err
- }
- r.IsLoaded = true
- r.data = data
- }
- return r.data, nil
-}
-
-type Resource struct {
- WebPath string
- FSPath string
-}
-
-type OpenResource struct {
- *Resource
- *os.File
- os.FileInfo
-}
-
-func (r *OpenResource) Payload() (interface{}, error) {
- if r.IsDir() {
- fis, err := r.Readdir(0)
- if err != nil {
- return nil, err
- }
- payload := make([]string, len(fis))
- for i, fi := range fis {
- payload[i] = fi.Name()
- }
- return payload, nil
- } else {
- return NewDelayedCachedReader(r), nil
- }
-}
@@ 0,0 1,76 @@
+/* Includes stuff for dealing with resources
+
+A resource is an object that corresponds to something on the filesystem and
+is provided to clients by the service; they are generally either files or
+directories.
+*/
+
+package main
+
+import "fmt"
+import "os"
+import "io/ioutil"
+
+const (
+ EventCreate = "create"
+ EventModify = "modify"
+ EventDelete = "delete"
+)
+
+var (
+ InvalidPath = fmt.Errorf("Invalid path")
+ ResourceNotFound = fmt.Errorf("Required resource was not found")
+)
+
+type Request struct {
+ Subscribe bool
+ Path string
+}
+
+type Error struct {
+ Message string
+ Details string
+}
+
+type Message struct {
+ Event string `json:",omitempty"`
+ //URL string <- todo, do this
+ Path string
+ Payload interface{} `json:",omitempty"`
+}
+
+type Location struct {
+ WebPath string
+ FSPath string
+}
+
+type OpenResource struct {
+ *Location
+ *os.File
+ os.FileInfo
+}
+
+func getPayload(path string) (interface{}, error) {
+ f, err := os.Open(path)
+ if err != nil {
+ return nil, err
+ }
+ stat, err := f.Stat()
+ if err != nil {
+ return nil, err
+ }
+
+ if stat.IsDir() {
+ dirnames, err := f.Readdirnames(0)
+ if err != nil {
+ return nil, err
+ }
+ return dirnames, nil
+ } else {
+ data, err := ioutil.ReadAll(f)
+ if err != nil {
+ return nil, err
+ }
+ return string(data), nil
+ }
+}
@@ 1,20 1,28 @@
package main
import "net/http"
-import "os"
+import _"os"
import "log"
-import "fmt"
-import "strings"
+import "sync"
+import _"fmt"
+import _"strings"
import "path/filepath"
-
+import "encoding/json"
import "code.google.com/p/go.net/websocket"
import "github.com/howeyc/fsnotify"
+// It shall be simple to:
+// find clients given a resource
+// remove a cilent from all resources
+// add a client to one resource
+
type GoquiturHandler struct {
*http.ServeMux
watchdir string
watcher *fsnotify.Watcher
- subs map[string][]*websocket.Conn
+ maplock *sync.Mutex
+ urlmap map[string][]*websocket.Conn
+ submap map[*websocket.Conn][]string
}
func NewGoquiturHandler(watchdir string) (*GoquiturHandler, error) {
@@ 22,6 30,7 @@ func NewGoquiturHandler(watchdir string)
if err != nil {
return nil, err
}
+
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
@@ 34,14 43,156 @@ func NewGoquiturHandler(watchdir string)
ServeMux: http.NewServeMux(),
watchdir: watchdir,
watcher: watcher,
- subs: make(map[string][]*websocket.Conn),
+ maplock: new(sync.Mutex),
+ urlmap: make(map[string][]*websocket.Conn),
+ submap: make(map[*websocket.Conn][]string),
}
h.ServeMux.Handle("/", http.FileServer(http.Dir("static")))
- h.ServeMux.Handle("/ws", websocket.Handler(h.serveWS))
- go h.handleFsEvents()
+ //h.ServeMux.Handle("/ws", websocket.Handler(h.serveWS))
+
+ evchan, errchan := processWatcher(h.watcher)
+
+ go h.handleEvents(evchan, errchan)
+
return h, nil
}
+func (h *GoquiturHandler) pathToLocation(path string) (*Location, error) {
+ webpath, err := filepath.Rel(h.watchdir, path)
+ if err != nil {
+ return nil, err
+ }
+ return &Location{webpath, path}, nil
+}
+
+func (h *GoquiturHandler) Subscribers(path string) []*websocket.Conn {
+ h.maplock.Lock()
+ defer h.maplock.Unlock()
+ if v, ok := h.urlmap[path]; ok == true {
+ return v
+ }
+ return make([]*websocket.Conn, 0)
+}
+
+func (h *GoquiturHandler) handleEvents(evchan <-chan *Event, errchan <-chan error) {
+ for {
+ select {
+ case ev := <-evchan:
+ log.Println("Got fs event!", ev)
+
+ loc, err := h.pathToLocation(ev.Path)
+ if err != nil {
+ log.Println("Error handling fs event:", err)
+ continue
+ }
+ log.Println("event at", loc)
+
+ switch ev.Type {
+ case EventCreate:
+ fallthrough
+ case EventModify:
+ fallthrough
+ case EventDelete:
+ var data []byte
+ for _, sub := range h.Subscribers(loc.WebPath) {
+ if data == nil {
+ // If we can't produce a valid value for data we can't
+ // handle this event at all, so we break and move on
+ // todo, set msg to some error thing and push that to
+ // clients
+ payload, err := getPayload(loc.FSPath)
+ if err != nil {
+ log.Println("Error getting message payload:", err)
+ break
+ }
+ msg := &Message{
+ Event: ev.Type,
+ Path: loc.WebPath,
+ Payload: payload,
+ }
+ data, err = json.Marshal(msg)
+ if err != nil {
+ log.Println("Error encoding message:", err)
+ break
+ }
+ }
+ go func() {
+ _, err = sub.Write(data)
+ if err != nil {
+ log.Println("Error writing to a client:", err)
+ // todo, remove subscriber?
+ }
+ }()
+ }
+ default:
+ log.Println("Unknown event, ignoring:", ev.Type)
+ }
+
+ case err := <-errchan:
+ log.Println("Error handling fs event:", err)
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+/*
+func (h *GoquiturHandler) resAdded(res *Resource) error {
+ ores, err := h.openResource(res)
+ if err != nil {
+ return err
+ }
+ payload, err := ores.Payload()
+ if err != nil {
+ return err
+ }
+ h.notifyConns(res.WebPath, &State{
+ Event: EventCreate,
+ Path: res.WebPath,
+ Payload: payload,
+ })
+ if res.WebPath != "/" {
+ parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
+ if err != nil {
+ return err
+ }
+ return h.resModified(parent)
+ }
+ return nil
+}
+
+func (h *GoquiturHandler) resModified(res *Resource) error {
+ ores, err := h.openResource(res)
+ if err != nil {
+ return err
+ }
+ payload, err := ores.Payload()
+ if err != nil {
+ return err
+ }
+ h.notifyConns(res.WebPath, &State{
+ Event: EventModify,
+ Path: res.WebPath,
+ Payload: payload,
+ })
+ return nil
+}
+
+func (h *GoquiturHandler) resRemoved(res *Resource) error {
+ h.notifyConns(res.WebPath, &State{
+ Event: EventDelete,
+ Path: res.WebPath,
+ })
+ if res.WebPath != "/" {
+ parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
+ if err != nil {
+ return err
+ }
+ return h.resModified(parent)
+ }
+ return nil
+}
+
+////////////////////////////////////////////////////////////////////////////////
func (h *GoquiturHandler) resFromWebPath(path string) (*Resource, error) {
if path == "" {
return nil, fmt.Errorf("Path must not be empty")
@@ 108,7 259,6 @@ func (h *GoquiturHandler) serveOneWS(ws
Details: err.Error(),
})
}
-
// Open the resource
ores, err := h.openResource(res)
if err != nil {
@@ 140,35 290,6 @@ func (h *GoquiturHandler) serveOneWS(ws
})
}
-func (h *GoquiturHandler) handleFsEvents() {
- for {
- log.Println("waiting for fs events")
- select {
- case ev := <-h.watcher.Event:
- var err error
- path := ev.Name[len(h.watchdir):]
- log.Println("event:", ev, path)
- res, err := h.resFromWebPath(path)
- if err != nil {
- log.Println("error resolving resource in handling fs event:", err)
- } else {
- if ev.IsDelete() || ev.IsRename() {
- err = h.resRemoved(res)
- } else if ev.IsModify() {
- err = h.resModified(res)
- } else if ev.IsCreate() {
- err = h.resAdded(res)
- }
- if err != nil {
- log.Println("error dealing with resource:", err)
- }
- }
- case err := <-h.watcher.Error:
- log.Println("error:", err)
- }
- }
-}
-
func notifyConn(conn *websocket.Conn, msg interface{}) error {
log.Printf("notifying %v with %+v", conn, msg)
return websocket.JSON.Send(conn, msg)
@@ 186,59 307,4 @@ func (h *GoquiturHandler) notifyConns(pa
}
}
}
-
-func (h *GoquiturHandler) resAdded(res *Resource) error {
- ores, err := h.openResource(res)
- if err != nil {
- return err
- }
- payload, err := ores.Payload()
- if err != nil {
- return err
- }
- h.notifyConns(res.WebPath, &State{
- Event: EventCreate,
- Path: res.WebPath,
- Payload: payload,
- })
- if res.WebPath != "/" {
- parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
- if err != nil {
- return err
- }
- return h.resModified(parent)
- }
- return nil
-}
-
-func (h *GoquiturHandler) resModified(res *Resource) error {
- ores, err := h.openResource(res)
- if err != nil {
- return err
- }
- payload, err := ores.Payload()
- if err != nil {
- return err
- }
- h.notifyConns(res.WebPath, &State{
- Event: EventModify,
- Path: res.WebPath,
- Payload: payload,
- })
- return nil
-}
-
-func (h *GoquiturHandler) resRemoved(res *Resource) error {
- h.notifyConns(res.WebPath, &State{
- Event: EventDelete,
- Path: res.WebPath,
- })
- if res.WebPath != "/" {
- parent, err := h.resFromWebPath(filepath.Dir(res.WebPath))
- if err != nil {
- return err
- }
- return h.resModified(parent)
- }
- return nil
-}
+*/