Fixed qp_run

This commit is contained in:
Anthony Scemama 2017-11-27 17:18:14 +01:00
parent a672cdb445
commit fa7505ceca
5 changed files with 54 additions and 53 deletions

View File

@ -245,7 +245,7 @@ end = struct
(Id.Client.to_int x.client_id)
end
(** GetTaskReply : Reply to the GetTasks message *)
(** GetTaskReply : Reply to the GetTask message *)
module GetTaskReply_msg : sig
type t
val create : task_id:Id.Task.t option -> task:string option -> t
@ -292,19 +292,22 @@ end
(** GetTasksReply : Reply to the GetTasks message *)
module GetTasksReply_msg : sig
type t = (Id.Task.t * string) list
type t = (Id.Task.t option * string) list
val create : t -> t
val to_string : t -> string
val to_string_list : t -> string list
end = struct
type t = (Id.Task.t * string) list
type t = (Id.Task.t option * string) list
let create l = l
let to_string _ =
"get_tasks_reply ok"
let to_string_list x =
"get_tasks_reply ok" :: (
List.map x ~f:(fun (task_id, task) -> Printf.sprintf "%d %s" (Id.Task.to_int task_id) task)
)
List.map x ~f:(fun (task_id, task) ->
match task_id with
| Some task_id -> Printf.sprintf "%d %s" (Id.Task.to_int task_id) task
| None -> Printf.sprintf "0 terminate"
) )
end
@ -552,5 +555,6 @@ let to_string = function
let to_string_list = function
| GetDataReply x -> GetDataReply_msg.to_string_list x
| GetTasksReply x -> GetTasksReply_msg.to_string_list x
| _ -> assert false

View File

@ -434,8 +434,8 @@ let get_tasks msg program_state rep_socket pair_socket =
in
match (task_id, task) with
| Some task_id, Some task ->
build_list ( (task_id, task)::accu ) new_queue (n-1)
| _ -> queue, ((Id.Task.of_int 0, "terminate")::accu)
build_list ( (Some task_id, task)::accu ) new_queue (n-1)
| _ -> queue, (None, "terminate")::accu
in
let new_queue, result =

View File

@ -29,7 +29,7 @@ let run slave exe ezfio_file =
try
List.iter [ 0;1;2;3;4;5;6;7;8;9 ] ~f:(fun i ->
let address =
Printf.sprintf "tcp://*:%d" (port_number+i)
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) (port_number+i)
in
ZMQ.Socket.bind dummy_socket address;
ZMQ.Socket.unbind dummy_socket address;
@ -50,10 +50,6 @@ let run slave exe ezfio_file =
Time.now ()
in
let address =
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number
in
if (not (Sys.file_exists_exn ezfio_file)) then
failwith ("EZFIO directory "^ezfio_file^" not found");
@ -103,6 +99,9 @@ let run slave exe ezfio_file =
in
thread ();
in
let address =
Printf.sprintf "tcp://%s:%d" (Lazy.force TaskServer.ip_address) port_number
in
Unix.putenv ~key:"QP_RUN_ADDRESS" ~data:address;
let () =
if (not slave) then

View File

@ -2,8 +2,6 @@ program cis
implicit none
integer :: i
! print *, 'HF = ', HF_energy
! print *, 'N_states = ', N_states
call H_apply_cis
print *, 'N_det = ', N_det
do i = 1,N_states

View File

@ -849,42 +849,42 @@ subroutine get_task_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task)
character*(64) :: reply
integer :: rc, sze
! call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,1)
write(message,*) 'get_task '//trim(zmq_state), worker_id
sze = len(trim(message))
rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
if (rc /= sze) then
print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
stop 'error'
endif
message = repeat(' ',512)
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
rc = min(1024,rc)
read(message(1:rc),*) reply
if (trim(reply) == 'get_task_reply') then
read(message(1:rc),*) reply, task_id
rc = 15
do while (message(rc:rc) == ' ')
rc += 1
enddo
do while (message(rc:rc) /= ' ')
rc += 1
enddo
rc += 1
task = message(rc:)
else if (trim(reply) == 'terminate') then
task_id = 0
task = 'terminate'
else if (trim(message) == 'error No job is running') then
task_id = 0
task = 'terminate'
else
print *, 'Unable to get the next task'
print *, trim(message)
stop -1
endif
call get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task,1)
! write(message,*) 'get_task '//trim(zmq_state), worker_id
!
! sze = len(trim(message))
! rc = f77_zmq_send(zmq_to_qp_run_socket, message, sze, 0)
! if (rc /= sze) then
! print *, irp_here, ':f77_zmq_send(zmq_to_qp_run_socket, trim(message), sze, 0)'
! stop 'error'
! endif
!
! message = repeat(' ',512)
! rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
! rc = min(1024,rc)
! read(message(1:rc),*) reply
! if (trim(reply) == 'get_task_reply') then
! read(message(1:rc),*) reply, task_id
! rc = 15
! do while (message(rc:rc) == ' ')
! rc += 1
! enddo
! do while (message(rc:rc) /= ' ')
! rc += 1
! enddo
! rc += 1
! task = message(rc:)
! else if (trim(reply) == 'terminate') then
! task_id = 0
! task = 'terminate'
! else if (trim(message) == 'error No job is running') then
! task_id = 0
! task = 'terminate'
! else
! print *, 'Unable to get the next task'
! print *, trim(message)
! stop -1
! endif
end
@ -918,9 +918,9 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
rc = f77_zmq_recv(zmq_to_qp_run_socket, message, 1024, 0)
rc = min(1024,rc)
read(message(1:rc),*) reply
if (trim(reply) == 'get_task_reply ok') then
if (trim(message) == 'get_tasks_reply ok') then
continue
else if (trim(reply) == 'terminate') then
else if (trim(message) == 'terminate') then
task_id(1) = 0
task(1) = 'terminate'
else if (trim(message) == 'error No job is running') then
@ -928,7 +928,7 @@ subroutine get_tasks_from_taskserver(zmq_to_qp_run_socket,worker_id,task_id,task
task(1) = 'terminate'
else
print *, 'Unable to get the next task'
print *, trim(message)
print *, ':'//trim(message)//':'
stop -1
endif