X-Git-Url: http://matita.cs.unibo.it/gitweb/?a=blobdiff_plain;f=helm%2Fhbugs%2Fbroker%2Fhbugs_broker.ml;h=2ff8b98349dbf11853fbff1b676b706195b7c0df;hb=1fb8d0192e1f7ee891c53dc282c9c9f111e63e3c;hp=ab3a3dfcfc92c88049aeae53b30d84ab21ce4dba;hpb=d6f4e5a43fbd054835d2323e7d5351741bd2ad3b;p=helm.git diff --git a/helm/hbugs/broker/hbugs_broker.ml b/helm/hbugs/broker/hbugs_broker.ml index ab3a3dfcf..2ff8b9834 100644 --- a/helm/hbugs/broker/hbugs_broker.ml +++ b/helm/hbugs/broker/hbugs_broker.ml @@ -31,7 +31,7 @@ open Printf;; let debug = true ;; let debug_print s = if debug then prerr_endline s ;; -(* Http_common.debug := true;; *) +Http_common.debug := false;; let daemon_name = "H-Bugs Broker" ;; let default_port = 49081 ;; @@ -53,27 +53,53 @@ let return_xml_msg body outchan = Http_daemon.respond ~headers:["Content-Type", "text/xml"] ~body outchan ;; let parse_musing_id = function - | Musing_started (_, musing_id) -> musing_id + | Musing_started (_, musing_id) -> + prerr_endline ("#### Started musing ID: " ^ musing_id); + musing_id | Musing_aborted (_, musing_id) -> musing_id - | _ -> assert false + | msg -> + prerr_endline (sprintf "Assertion failed, received msg: %s" + (Hbugs_messages.string_of_msg msg)); + assert false ;; let do_critical = let mutex = Mutex.create () in fun action -> try - Mutex.lock mutex; let res = Lazy.force action in Mutex.unlock mutex; res +(* debug_print "Acquiring lock ..."; *) + Mutex.lock mutex; +(* debug_print "Lock Acquired!"; *) + let res = Lazy.force action in +(* debug_print "Releaseing lock ..."; *) + Mutex.unlock mutex; +(* debug_print "Lock released!"; *) + res with e -> Mutex.unlock mutex; raise e ;; + (* registries *) let clients = new Hbugs_broker_registry.clients in let tutors = new Hbugs_broker_registry.tutors in let musings = new Hbugs_broker_registry.musings in +let registries = + [ (clients :> Hbugs_broker_registry.registry); + (tutors :> Hbugs_broker_registry.registry); + (musings :> Hbugs_broker_registry.registry) ] +in + let my_own_id = Hbugs_id_generator.new_broker_id () in -let handle_msg outchan = function + (* debugging: dump broker internal status, used by '/dump' method *) +let dump_registries () = + assert debug; + String.concat "\n" (List.map (fun o -> o#dump) registries) +in +let handle_msg outchan msg = (* messages from clients *) + (match msg with + | Help -> Hbugs_messages.respond_msg (Usage usage_string) outchan | Register_client (client_id, client_url) -> do_critical (lazy ( @@ -92,7 +118,9 @@ let handle_msg outchan = function )) | List_tutors client_id -> do_critical (lazy ( if clients#isAuthenticated client_id then begin - Hbugs_messages.respond_msg (Tutor_list (my_own_id, tutors#index)) outchan + Hbugs_messages.respond_msg + (Tutor_list (my_own_id, tutors#index)) + outchan end else Hbugs_messages.respond_exc "forbidden" client_id outchan )) @@ -117,6 +145,10 @@ let handle_msg outchan = function | State_change (client_id, new_state) -> do_critical (lazy ( if clients#isAuthenticated client_id then begin let active_musings = musings#getByClientId client_id in + prerr_endline (sprintf "ACTIVE MUSINGS: %s" (String.concat ", " active_musings)); + if List.length active_musings = 0 then + prerr_endline ("No active musings for client " ^ client_id); + prerr_endline "CSC: State change!!!" ; let stop_answers = List.map (* collect Abort_musing message's responses *) (fun id -> (* musing id *) @@ -125,29 +157,39 @@ let handle_msg outchan = function ~url:(tutors#getUrl tutor) (Abort_musing (my_own_id, id))) active_musings in + let stopped_musing_ids = List.map parse_musing_id stop_answers in List.iter musings#unregister active_musings; - let started_musing_ids = - List.map (* register new musings and collect their ids *) - (fun tutor_id -> - let res = - Hbugs_messages.submit_req - ~url:(tutors#getUrl tutor_id) - (Start_musing (my_own_id, new_state)) - in - let musing_id = parse_musing_id res in - musings#register musing_id client_id tutor_id; - musing_id) - (clients#getSubscription client_id) - in - let stopped_musing_ids = List.map parse_musing_id stop_answers in - Hbugs_messages.respond_msg - (State_accepted (my_own_id, stopped_musing_ids, started_musing_ids)) - outchan + (match new_state with + | Some new_state -> (* need to start new musings *) + let subscriptions = clients#getSubscription client_id in + if List.length subscriptions = 0 then + prerr_endline ("No subscriptions for client " ^ client_id); + let started_musing_ids = + List.map (* register new musings and collect their ids *) + (fun tutor_id -> + let res = + Hbugs_messages.submit_req + ~url:(tutors#getUrl tutor_id) + (Start_musing (my_own_id, new_state)) + in + let musing_id = parse_musing_id res in + musings#register musing_id client_id tutor_id; + musing_id) + subscriptions + in + Hbugs_messages.respond_msg + (State_accepted (my_own_id, stopped_musing_ids, started_musing_ids)) + outchan + | None -> (* no need to start new musings *) + Hbugs_messages.respond_msg + (State_accepted (my_own_id, stopped_musing_ids, [])) + outchan) end else Hbugs_messages.respond_exc "forbidden" client_id outchan )) (* messages from tutors *) + | Register_tutor (tutor_id, tutor_url, hint_type, dsc) -> do_critical (lazy ( try tutors#register tutor_id tutor_url hint_type dsc; @@ -162,32 +204,41 @@ let handle_msg outchan = function end else Hbugs_messages.respond_exc "forbidden" tutor_id outchan )) + | Musing_completed (tutor_id, musing_id, result) -> do_critical (lazy ( - if tutors#isAuthenticated tutor_id then begin + if not (tutors#isAuthenticated tutor_id) then begin (* unauthorized *) + Hbugs_messages.respond_exc "forbidden" tutor_id outchan; + end else if not (musings#isActive musing_id) then begin (* too late *) + Hbugs_messages.respond_msg (Too_late (my_own_id, musing_id)) outchan; + end else begin (* all is ok: autorhized and on time *) (match result with | Sorry -> () - | Eureka extras -> + | Eureka hint -> + let client_url = + clients#getUrl (fst (musings#getByMusingId musing_id)) + in let res = - let hint = (* TODO decidere la hint *) "hint!!!!" in - let url = - clients#getUrl (fst (musings#getByMusingId musing_id)) - in - Hbugs_messages.submit_req ~url (Hint (my_own_id, hint)) + Hbugs_messages.submit_req ~url:client_url (Hint (my_own_id, hint)) in - ignore res (* TODO mi interessa la risposta? *) - ); + (match res with + | Wow _ -> () (* ok: client is happy with our hint *) + | unexpected_msg -> + prerr_endline + (sprintf + "Warning: unexpected msg from client: %s\nExpected was: Wow" + (Hbugs_messages.string_of_msg msg)))); Hbugs_messages.respond_msg (Thanks (my_own_id, musing_id)) outchan; musings#unregister musing_id - end else - Hbugs_messages.respond_exc "forbidden" tutor_id outchan + end )) | msg -> (* unexpected message *) debug_print "Unknown message!"; Hbugs_messages.respond_exc - "unexpected_msg" (Hbugs_messages.string_of_msg msg) outchan + "unexpected_msg" (Hbugs_messages.string_of_msg msg) outchan) in -let handle_msg outchan = (* debugging wrapper around 'handle_msg' *) +(* (* DEBUGGING wrapper around 'handle_msg' *) +let handle_msg outchan = if debug then (fun msg -> (* filter handle_msg through a function which dumps input messages *) @@ -196,6 +247,7 @@ let handle_msg outchan = (* debugging wrapper around 'handle_msg' *) else handle_msg outchan in +*) (* thread action *) let callback (req: Http_types.request) outchan = @@ -205,7 +257,14 @@ let callback (req: Http_types.request) outchan = (match req#path with (* TODO write help message *) | "/help" -> return_xml_msg " not yet written " outchan - | "/act" -> handle_msg outchan (Hbugs_messages.msg_of_string req#body) + | "/act" -> + let msg = Hbugs_messages.msg_of_string req#body in + handle_msg outchan msg + | "/dump" -> + if debug then + Http_daemon.respond ~body:(dump_registries ()) outchan + else + Http_daemon.respond_error ~code:400 outchan | _ -> Http_daemon.respond_error ~code:400 outchan); debug_print "Done!\n" with @@ -216,10 +275,18 @@ let callback (req: Http_types.request) outchan = "uncaught_exception" (Printexc.to_string exc) outchan in -(* TODO aggiungere lo spazzino che elimina i client/tutor/computation che non si -fanno sentire da troppo tempo ... *) - (* start daemon *) + (* thread who cleans up ancient client/tutor/musing registrations *) +let ragman () = + let delay = 3600.0 in (* 1 hour delay *) + while true do + Thread.delay delay; + List.iter (fun o -> o#purge) registries + done +in + + (* start daemon *) printf "Listening on port %d ...\n" port; flush stdout; +ignore (Thread.create ragman ()); Http_daemon.start' ~port ~mode:`Thread callback