* http://cs.unibo.it/helm/.
*)
+Http_common.debug := true;;
+let debug = true;;
+let debug_print s = if debug then prerr_endline s;;
+
open Printf;;
let daemon_name = "Uri Set Queue";;
let default_port = 48082;;
let port_env_var = "URI_SET_QUEUE_PORT";;
+module OrderedUri: Set.OrderedType with type t = string =
+ struct
+ type t = string
+ let compare = compare
+ end
+module UriSet = Set.Make (OrderedUri)
type uri_queue = {
mutable size: int;
mutable overflowed: bool;
uris: string Queue.t;
+ mutable olduris: UriSet.t;
}
(** raised when a queue is accessed before being defined *)
exception Queue_not_found of int;;
(** global uri_queue, used by all children *)
-let uri_queue = { size = 0; overflowed = false; uris = Queue.create () };;
+let uri_queue = {
+ size = 0; overflowed = false; uris = Queue.create (); olduris = UriSet.empty
+};;
let (get_queue, add_queue, remove_queue) =
let uri_queues = Hashtbl.create 17 in
((fun pid -> (* get_queue *)
Hashtbl.replace
uri_queues
pid
- { size = size; overflowed = false; uris = Queue.create () }),
+ { size = size; overflowed = false;
+ uris = Queue.create (); olduris = UriSet.empty }),
(fun pid -> (* remove_queue *)
try
Hashtbl.remove uri_queues pid
| "/add_if_not_in" ->
let (uri, pid) = (req#param "uri", int_of_string (req#param "PID")) in
+ debug_print (sprintf "Adding uri '%s' to queue '%d'" uri pid);
let queue = get_queue pid in
let result =
- if not (queue_mem uri queue.uris) then begin (* uri not in *)
- if Queue.length queue.uris >= queue.size then begin (* overflow! *)
+ if (Queue.length queue.uris) + (UriSet.cardinal queue.olduris) >=
+ queue.size
+ then
+ begin (* overflow! *)
queue.overflowed <- true;
+ debug_print "Answer: not_added_because_already_too_many";
"not_added_because_already_too_many"
- end else begin (* add the uri *)
- Queue.add uri queue.uris;
- "added"
+ end else begin (* there's room for another uri *)
+ if (queue_mem uri queue.uris) || (UriSet.mem uri queue.olduris)
+ then
+ begin (* url already in *)
+ debug_print "Answer: already_in";
+ "already_in"
+ end else begin (* uri not in *)
+ Queue.add uri queue.uris;
+ debug_print "Answer: added";
+ "added"
+ end
end
- end else (* url already in *)
- "already_in"
in
- res#setContents (sprintf "<?xml version=\"1.0\"?><%s/>\n" result);
+ res#setContents (sprintf "<?xml version=\"1.0\"?>\n<%s/>\n" result);
+ if debug then res#serialize stderr;
Http_daemon.respond_with res outchan
| "/is_overflowed" ->
let pid = int_of_string (req#param "PID") in
let queue = get_queue pid in
let result = string_of_bool (queue.overflowed) in
- res#setContents (sprintf "<?xml version=\"1.0\"?><%s/>\n" result);
+ debug_print (sprintf "%d queue is_overflowed = %s" pid result);
+ res#setContents (sprintf "<?xml version=\"1.0\"?>\n<%s/>\n" result);
+ if debug then res#serialize stderr;
Http_daemon.respond_with res outchan
| "/set_uri_set_size" ->
let (pid, size) =
(int_of_string (req#param "PID"), int_of_string (req#param "size"))
in
+ debug_print (sprintf "Setting size '%d' for queue '%d'" size pid);
(try
let queue = get_queue pid in
queue.size <- size;
with Queue_not_found p ->
assert (p = pid);
add_queue pid size);
- res#setContents "<?xml version=\"1.0\">\n<done/>\n";
+ res#setContents "<?xml version=\"1.0\"?>\n<done/>\n";
+ if debug then res#serialize stderr;
Http_daemon.respond_with res outchan
| "/get_next" ->
let pid = int_of_string (req#param "PID") in
+ debug_print (sprintf "Getting next uri from queue '%d'" pid);
let queue = get_queue pid in
let element = (* xml response's root element *)
try
let uri = Queue.take queue.uris in
+ queue.olduris <- UriSet.add uri queue.olduris;
sprintf
"<%suri value=\"%s\"/>"
(if queue.overflowed then "marked_" else "")
uri
with Queue.Empty -> "<empty/>"
in
- res#setContents ("<?xml version=\"1.0\">" ^ element ^ "\n");
+ res#setContents ("<?xml version=\"1.0\"?>\n" ^ element ^ "\n");
+ if debug then res#serialize stderr;
Http_daemon.respond_with res outchan
| "/reset_to_empty" ->
let pid = int_of_string (req#param "PID") in
remove_queue pid;
- res#setContents "<?xml version=\"1.0\">\n<done/>\n";
+ debug_print (sprintf "Resetting queue '%d'" pid);
+ res#setContents "<?xml version=\"1.0\"?>\n<done/>\n";
+ if debug then res#serialize stderr;
Http_daemon.respond_with res outchan
| invalid_request ->
- Http_daemon.respond_error ~status:(`Client_error `Bad_request) outchan)
+ debug_print ("Invalid request received");
+ Http_daemon.respond_error ~status:(`Client_error `Bad_request) outchan);
+ prerr_endline "Request done!\n"
with
| Http_request.Param_not_found attr_name ->
Http_daemon.respond_error