source: branches/streams/abcl/tools/threads-jss.lisp

Last change on this file was 13703, checked in by Mark Evenson, 13 years ago

Fix #183: move threads-jss.lisp out of system source.

'threads-jss.lisp' provides a rudimentary implementation of a server
framework using the java.util.concurrent abstractions. Ripped out of
another project, the code uses the JSS syntax for brevity making it
more or less impossible to actually compile as system source as it
depends on the JSS contrib. We move it to the tools directory until
we can rewrite the use of primitives to use the lower-level Java FFI.

File size: 4.7 KB
Line 
1;;;; Copyright (C) 2011 Mark Evenson
2
3(in-package #:threads)
4
5(require 'abcl-contrib)
6(eval-when (:compile-toplevel)
7           (require 'jss))
8
9(defparameter *server-running-p* nil)
10
11;;; XXX possibly need multiple thread pools
12(defparameter *thread-pool* nil
13  "The current JVM class implementing the ScheduledThreadPool abstraction.")
14(defparameter *scheduled-futures* nil)
15(defparameter *incoming-scheduled-future* nil)
16(defparameter *watch-queue-future* nil)
17
18
19;;;; Configure the directories for a threadpool from these defaults.
20(defparameter *root* #p"/var/tmp/abcl-threads/")
21
22(defparameter *logs* (merge-pathnames "logs/" *root*))
23
24(defparameter *incoming* (merge-pathnames "incoming/" *root*))
25(defparameter *dirs* (list *incoming*))
26
27(defparameter *queue* (merge-pathnames "queue/" *root*))
28
29(defparameter *processed* (merge-pathnames "processed/" *root*))
30
31
32;;;; A simple logging abstraction.
33
34(defconstant +month-names+ '("Jan" "Feb" "Mar" "Apr" "May" "Jun" 
35                             "Jul" "Aug" "Sep" "Oct" "Nov" "Dec"))
36(defconstant +seconds+ (java:jfield "java.util.concurrent.TimeUnit" "SECONDS"))
37(defparameter *log* *standard-output*)
38
39(defun format-time (universal-time)
40    (multiple-value-bind 
41          (second minute hour date month year day-of-week dst-p tz)
42        (decode-universal-time universal-time)
43      (format nil "~&~A ~A ~2,'0D:~2,'0D:~2,'0D" 
44              (nth (1- month) +month-names+) date hour minute second)))
45
46(defmacro log (message &rest parameters)
47  `(when *log*
48     (format *log* "~A " (format-time (get-universal-time)))
49     (format *log* ,message ,@parameters)
50     (format *log* "~&")))
51
52;;; Start a pool of hungry philosophers.
53(defun start-server () 
54  (when *server-running-p*
55    (error "Server not recorded as stopped."))
56  (unless 
57      (mapcar #'ensure-directories-exist *dirs*)
58    (error "Failed to create directories under '~A'." *root*))
59  (let ((logfile (merge-pathnames "abcl-threads.log" *logs*)))
60    (setf *log* 
61          (open logfile :direction :output :if-exists :append :if-does-not-exist :create))
62    (format *standard-output* "Logging to ~A." logfile))
63  (log "Starts.")
64  (schedule-threads)
65  (setf *server-running-p* t))
66
67(defun stop-server (&key (force nil))
68  (unless force
69    (unless *server-running-p*
70      (error "Server not recorded as running.")))
71  (log "Stopping the server.")
72  (dolist (future `(,*incoming-scheduled-future* ,*watch-queue-future* ,@*scheduled-futures*))
73    (when (not (or (#"isCancelled" future)
74                   (#"isDone" future)))
75      (#"cancel" future t)))
76  (#"shutdown" *thread-pool*)
77  (close *log*)
78  (setf *server-running-p* nil))
79
80(defun schedule-threads ()
81  (log "Starting thread pool.")
82  (when *thread-pool*
83    (log "Removing existing incoming thread pool."))
84  (setf *thread-pool*
85        (#"newScheduledThreadPool" 'java.util.concurrent.Executors 1))
86  (#"setExecuteExistingDelayedTasksAfterShutdownPolicy" *thread-pool* nil)
87  (initialize-queue)
88  (log "Scheduling queue watcher.")
89  (setf *watch-queue-future* 
90        (#"scheduleWithFixedDelay" 
91         *thread-pool*
92         (make-watch-queue) 10 1 +seconds+))
93  (log "Scheduling incoming watcher.")
94  (setf *incoming-scheduled-future*
95        (#"scheduleWithFixedDelay" 
96         *thread-pool*
97         (make-process-incoming) 1 1 +seconds+)))
98
99(defun make-process-incoming ()
100  (java:jinterface-implementation "java.lang.Runnable" "run" #'process-incoming))
101
102(defun process-incoming ()
103  (flet ((reject-input (file invalid) 
104           (warn (format nil "~A is ~A" file invalid)))
105         (process (file)
106           nil))
107
108  (let ((incoming (directory (merge-pathnames *incoming* "*"))))
109    (unless incoming
110      (return-from process-incoming))
111    (log "Processing ~A incoming items." (length incoming))
112    (let (table error)
113      (dolist (file incoming)
114        (setf error nil)
115        (log "Analyzing ~A." file)
116        (setf table
117              (handler-case 
118                  (process file)
119                (t (e) 
120                  (log "Failed to process ~A because ~A" file e)
121                  (setf error e))))
122        (if error 
123            (reject-input file (if (listp error) error (list error)))
124            (multiple-value-bind (valid invalid)
125                (validate table)
126              (if invalid 
127                  (progn 
128                    (log "Rejecting ~A because of invalid rows." file)
129                    (reject-input file invalid))
130                  (let ((incoming 
131                         (make-pathname :defaults *queue* 
132                                        :name (pathname-name file)
133                                        :type (pathname-type file))))
134                    (log "Inserting ~A." incoming)
135                    (rename-file file incoming))))))))))
136
Note: See TracBrowser for help on using the repository browser.