]> matita.cs.unibo.it Git - helm.git/blobdiff - helm/graphs/tools/uriSetQueue.ml
ocaml 3.09 transition
[helm.git] / helm / graphs / tools / uriSetQueue.ml
index 2ce17a1412db9bd9caab66a8b55564e8b733b279..43027580016fe58101457ccc8ae38398ff8f0e8c 100644 (file)
  * http://cs.unibo.it/helm/.
  *)
 
+let debug = true;;
+let debug_print s = if debug then prerr_endline s;;
+Http_common.debug := debug;;
+
 open Printf;;
 
+let configuration_file = "/projects/helm/etc/uriSetQueue.conf.xml";;
 let daemon_name = "Uri Set Queue";;
-let default_port = 48082;;
-let port_env_var = "URI_SET_QUEUE_PORT";;
 
+module OrderedUri: Set.OrderedType with type t = string =
+  struct
+    type t = string
+    let compare = compare
+  end
+module UriSet = Set.Make (OrderedUri)
 type uri_queue = {
   mutable size: int;
   mutable overflowed: bool;
   uris: string Queue.t;
+  mutable olduris: UriSet.t;
 }
   (** raised when a queue is accessed before being defined *)
 exception Queue_not_found of int;;
   (** global uri_queue, used by all children *)
-let uri_queue = { size = 0; overflowed = false; uris = Queue.create () };;
+let uri_queue = {
+  size = 0; overflowed = false; uris = Queue.create (); olduris = UriSet.empty
+};;
 let (get_queue, add_queue, remove_queue) =
   let uri_queues = Hashtbl.create 17 in
   ((fun pid -> (* get_queue *)
@@ -48,7 +60,8 @@ let (get_queue, add_queue, remove_queue) =
     Hashtbl.replace
       uri_queues
       pid
-      { size = size; overflowed = false; uris = Queue.create () }),
+      { size = size; overflowed = false;
+        uris = Queue.create (); olduris = UriSet.empty }),
   (fun pid -> (* remove_queue *)
     try
       Hashtbl.remove uri_queues pid
@@ -63,100 +76,115 @@ let queue_mem item queue =  (* mem function over queues *)
   with Found -> true
 ;;
 
-let port =
-  try
-    int_of_string (Sys.getenv port_env_var)
-  with
-  | Not_found -> default_port
-  | Failure "int_of_string" ->
-      prerr_endline "Warning: invalid port, reverting to default";
-      default_port
-in
-let callback req outchan =
+let callback (req: Http_types.request) outchan =
   try
-    let res = new Http_response.response in
+    let res = new Http_response.response () in
     res#addBasicHeaders;
     res#setContentType "text/xml";
     (match req#path with
 
     | "/add_if_not_in" ->
         let (uri, pid) = (req#param "uri", int_of_string (req#param "PID")) in
+        debug_print (sprintf "Adding uri '%s' to queue '%d'" uri pid);
         let queue = get_queue pid in
         let result =
-          if not (queue_mem uri queue.uris) then begin (* uri not in *)
-            if Queue.length queue.uris >= queue.size then begin (* overflow! *)
+          if (Queue.length queue.uris) + (UriSet.cardinal queue.olduris) >=
+             queue.size
+          then
+            begin (* overflow! *)
               queue.overflowed <- true;
+              debug_print "Answer: not_added_because_already_too_many";
               "not_added_because_already_too_many"
-            end else begin  (* add the uri *)
-              Queue.add uri queue.uris;
-              "added"
+            end else begin  (* there's room for another uri *)
+              if (queue_mem uri queue.uris) || (UriSet.mem uri queue.olduris)
+              then
+                begin (* url already in *)
+                  debug_print "Answer: already_in";
+                  "already_in"
+                end else begin (* uri not in *)
+                  Queue.add uri queue.uris;
+                  debug_print "Answer: added";
+                  "added"
+                end
             end
-          end else (* url already in *)
-            "already_in"
         in
-        res#setContents (sprintf "<?xml version=\"1.0\"?><%s/>\n" result);
+        res#setBody (sprintf "<?xml version=\"1.0\"?>\n<%s/>\n" result);
+        if debug then res#serialize stderr;
         Http_daemon.respond_with res outchan
 
     | "/is_overflowed" ->
         let pid = int_of_string (req#param "PID") in
         let queue = get_queue pid in
         let result = string_of_bool (queue.overflowed) in
-        res#setContents (sprintf "<?xml version=\"1.0\"?><%s/>\n" result);
+        debug_print (sprintf "%d queue is_overflowed = %s" pid result);
+        res#setBody (sprintf "<?xml version=\"1.0\"?>\n<%s/>\n" result);
+        if debug then res#serialize stderr;
         Http_daemon.respond_with res outchan
 
     | "/set_uri_set_size" ->
         let (pid, size) =
           (int_of_string (req#param "PID"), int_of_string (req#param "size"))
         in
+        debug_print (sprintf "Setting size '%d' for queue '%d'" size pid);
         (try
           let queue = get_queue pid in
           queue.size <- size;
         with Queue_not_found p ->
           assert (p = pid);
           add_queue pid size);
-        res#setContents "<?xml version=\"1.0\">\n<done/>\n";
+        res#setBody "<?xml version=\"1.0\"?>\n<done/>\n";
+        if debug then res#serialize stderr;
         Http_daemon.respond_with res outchan
 
     | "/get_next" ->
         let pid = int_of_string (req#param "PID") in
+        debug_print (sprintf "Getting next uri from queue '%d'" pid);
         let queue = get_queue pid in
         let element = (* xml response's root element *)
           try
             let uri = Queue.take queue.uris in
+            queue.olduris <- UriSet.add uri queue.olduris;
             sprintf
               "<%suri value=\"%s\"/>"
               (if queue.overflowed then "marked_" else "")
               uri
           with Queue.Empty -> "<empty/>"
         in
-        res#setContents ("<?xml version=\"1.0\">" ^ element ^ "\n");
+        res#setBody ("<?xml version=\"1.0\"?>\n" ^ element ^ "\n");
+        if debug then res#serialize stderr;
         Http_daemon.respond_with res outchan
 
     | "/reset_to_empty" ->
         let pid = int_of_string (req#param "PID") in
         remove_queue pid;
-        res#setContents "<?xml version=\"1.0\">\n<done/>\n";
+        debug_print (sprintf "Resetting queue '%d'" pid);
+        res#setBody "<?xml version=\"1.0\"?>\n<done/>\n";
+        if debug then res#serialize stderr;
         Http_daemon.respond_with res outchan
 
     | invalid_request ->
-        Http_daemon.respond_error ~status:(`Client_error `Bad_request) outchan)
+        debug_print ("Invalid request received");
+        Http_daemon.respond_error
+          ~code:(`Status (`Client_error `Bad_request)) outchan);
+        prerr_endline "Request done!\n"
   with
-  | Http_request.Param_not_found attr_name ->
-      Http_daemon.respond_error
-        ~status:(`Client_error `Bad_request)
+  | Http_types.Param_not_found attr_name ->
+      Http_daemon.respond_error ~code:(`Status (`Client_error `Bad_request))
         ~body:(sprintf "Parameter '%s' is missing" attr_name)
         outchan
   | Failure "int_of_string" ->  (* error in converting some paramters *)
-      Http_daemon.respond_error ~status:(`Client_error `Bad_request) outchan
-  | Queue_not_found queue_name ->
       Http_daemon.respond_error
-        ~status:(`Client_error `Bad_request)
+        ~code:(`Status (`Client_error `Bad_request)) outchan
+  | Queue_not_found queue_name ->
+      Http_daemon.respond_error ~code:(`Status (`Client_error `Bad_request))
         ~body:(sprintf "Queue '%d' is not defined" queue_name)
         outchan
 in
 
+Helm_registry.load_from configuration_file;
+let port = Helm_registry.get_int "uri_set_queue.port" in
 printf "%s started and listening on port %d\n" daemon_name port;
 flush stdout;
-Http_daemon.start' ~port ~fork:false callback;
+Http_daemon.start' ~port ~mode:`Thread callback;
 printf "%s is terminating, bye!\n" daemon_name