X-Git-Url: http://matita.cs.unibo.it/gitweb/?a=blobdiff_plain;f=helm%2Fgraphs%2Ftools%2FuriSetQueue.ml;h=43027580016fe58101457ccc8ae38398ff8f0e8c;hb=4167cea65ca58897d1a3dbb81ff95de5074700cc;hp=2ce17a1412db9bd9caab66a8b55564e8b733b279;hpb=4c67cc0208078e3952a5146563d4d49116f7edf4;p=helm.git diff --git a/helm/graphs/tools/uriSetQueue.ml b/helm/graphs/tools/uriSetQueue.ml index 2ce17a141..430275800 100644 --- a/helm/graphs/tools/uriSetQueue.ml +++ b/helm/graphs/tools/uriSetQueue.ml @@ -23,21 +23,33 @@ * http://cs.unibo.it/helm/. *) +let debug = true;; +let debug_print s = if debug then prerr_endline s;; +Http_common.debug := debug;; + open Printf;; +let configuration_file = "/projects/helm/etc/uriSetQueue.conf.xml";; let daemon_name = "Uri Set Queue";; -let default_port = 48082;; -let port_env_var = "URI_SET_QUEUE_PORT";; +module OrderedUri: Set.OrderedType with type t = string = + struct + type t = string + let compare = compare + end +module UriSet = Set.Make (OrderedUri) type uri_queue = { mutable size: int; mutable overflowed: bool; uris: string Queue.t; + mutable olduris: UriSet.t; } (** raised when a queue is accessed before being defined *) exception Queue_not_found of int;; (** global uri_queue, used by all children *) -let uri_queue = { size = 0; overflowed = false; uris = Queue.create () };; +let uri_queue = { + size = 0; overflowed = false; uris = Queue.create (); olduris = UriSet.empty +};; let (get_queue, add_queue, remove_queue) = let uri_queues = Hashtbl.create 17 in ((fun pid -> (* get_queue *) @@ -48,7 +60,8 @@ let (get_queue, add_queue, remove_queue) = Hashtbl.replace uri_queues pid - { size = size; overflowed = false; uris = Queue.create () }), + { size = size; overflowed = false; + uris = Queue.create (); olduris = UriSet.empty }), (fun pid -> (* remove_queue *) try Hashtbl.remove uri_queues pid @@ -63,100 +76,115 @@ let queue_mem item queue = (* mem function over queues *) with Found -> true ;; -let port = - try - int_of_string (Sys.getenv port_env_var) - with - | Not_found -> default_port - | Failure "int_of_string" -> - prerr_endline "Warning: invalid port, reverting to default"; - default_port -in -let callback req outchan = +let callback (req: Http_types.request) outchan = try - let res = new Http_response.response in + let res = new Http_response.response () in res#addBasicHeaders; res#setContentType "text/xml"; (match req#path with | "/add_if_not_in" -> let (uri, pid) = (req#param "uri", int_of_string (req#param "PID")) in + debug_print (sprintf "Adding uri '%s' to queue '%d'" uri pid); let queue = get_queue pid in let result = - if not (queue_mem uri queue.uris) then begin (* uri not in *) - if Queue.length queue.uris >= queue.size then begin (* overflow! *) + if (Queue.length queue.uris) + (UriSet.cardinal queue.olduris) >= + queue.size + then + begin (* overflow! *) queue.overflowed <- true; + debug_print "Answer: not_added_because_already_too_many"; "not_added_because_already_too_many" - end else begin (* add the uri *) - Queue.add uri queue.uris; - "added" + end else begin (* there's room for another uri *) + if (queue_mem uri queue.uris) || (UriSet.mem uri queue.olduris) + then + begin (* url already in *) + debug_print "Answer: already_in"; + "already_in" + end else begin (* uri not in *) + Queue.add uri queue.uris; + debug_print "Answer: added"; + "added" + end end - end else (* url already in *) - "already_in" in - res#setContents (sprintf "<%s/>\n" result); + res#setBody (sprintf "\n<%s/>\n" result); + if debug then res#serialize stderr; Http_daemon.respond_with res outchan | "/is_overflowed" -> let pid = int_of_string (req#param "PID") in let queue = get_queue pid in let result = string_of_bool (queue.overflowed) in - res#setContents (sprintf "<%s/>\n" result); + debug_print (sprintf "%d queue is_overflowed = %s" pid result); + res#setBody (sprintf "\n<%s/>\n" result); + if debug then res#serialize stderr; Http_daemon.respond_with res outchan | "/set_uri_set_size" -> let (pid, size) = (int_of_string (req#param "PID"), int_of_string (req#param "size")) in + debug_print (sprintf "Setting size '%d' for queue '%d'" size pid); (try let queue = get_queue pid in queue.size <- size; with Queue_not_found p -> assert (p = pid); add_queue pid size); - res#setContents "\n\n"; + res#setBody "\n\n"; + if debug then res#serialize stderr; Http_daemon.respond_with res outchan | "/get_next" -> let pid = int_of_string (req#param "PID") in + debug_print (sprintf "Getting next uri from queue '%d'" pid); let queue = get_queue pid in let element = (* xml response's root element *) try let uri = Queue.take queue.uris in + queue.olduris <- UriSet.add uri queue.olduris; sprintf "<%suri value=\"%s\"/>" (if queue.overflowed then "marked_" else "") uri with Queue.Empty -> "" in - res#setContents ("" ^ element ^ "\n"); + res#setBody ("\n" ^ element ^ "\n"); + if debug then res#serialize stderr; Http_daemon.respond_with res outchan | "/reset_to_empty" -> let pid = int_of_string (req#param "PID") in remove_queue pid; - res#setContents "\n\n"; + debug_print (sprintf "Resetting queue '%d'" pid); + res#setBody "\n\n"; + if debug then res#serialize stderr; Http_daemon.respond_with res outchan | invalid_request -> - Http_daemon.respond_error ~status:(`Client_error `Bad_request) outchan) + debug_print ("Invalid request received"); + Http_daemon.respond_error + ~code:(`Status (`Client_error `Bad_request)) outchan); + prerr_endline "Request done!\n" with - | Http_request.Param_not_found attr_name -> - Http_daemon.respond_error - ~status:(`Client_error `Bad_request) + | Http_types.Param_not_found attr_name -> + Http_daemon.respond_error ~code:(`Status (`Client_error `Bad_request)) ~body:(sprintf "Parameter '%s' is missing" attr_name) outchan | Failure "int_of_string" -> (* error in converting some paramters *) - Http_daemon.respond_error ~status:(`Client_error `Bad_request) outchan - | Queue_not_found queue_name -> Http_daemon.respond_error - ~status:(`Client_error `Bad_request) + ~code:(`Status (`Client_error `Bad_request)) outchan + | Queue_not_found queue_name -> + Http_daemon.respond_error ~code:(`Status (`Client_error `Bad_request)) ~body:(sprintf "Queue '%d' is not defined" queue_name) outchan in +Helm_registry.load_from configuration_file; +let port = Helm_registry.get_int "uri_set_queue.port" in printf "%s started and listening on port %d\n" daemon_name port; flush stdout; -Http_daemon.start' ~port ~fork:false callback; +Http_daemon.start' ~port ~mode:`Thread callback; printf "%s is terminating, bye!\n" daemon_name