source: trunk/abcl/src/org/armedbear/lisp/threads-jss.lisp @ 13616

Last change on this file since 13616 was 13616, checked in by Mark Evenson, 10 years ago

A wrapping of the built-in Java thread pool of Executors.

Provides a basis for experimenting with multi-core Lisp execution.
In the future, java.util.concurrent.Callable would be the interface to make
individual executions based upon.

THREADS:START-SERVER will start a primitive, file-based message queue
system that processes files as they are created in a given directory.

File size: 3.0 KB
Line 
1;;;; Copyright (C) 2011 Mark Evenson
2
3(in-package #:threads)
4
5(require 'abcl-contrib)
6(eval-when (:compile-toplevel)
7           (require 'jss))
8
9(defparameter *server-running-p* nil)
10
11;;; XXX possibly need multiple thread pools
12(defparameter *thread-pool* nil)
13(defparameter *scheduled-futures* nil)
14(defparameter *incoming-scheduled-future* nil)
15(defparameter *watch-queue-future* nil)
16
17
18;;;; Configure the directories for a threadpool from these defaults.
19(defparameter *root* #p"/var/tmp/abcl-threads/")
20
21(defparameter *logs* (merge-pathnames "logs/" *root*))
22
23(defparameter *incoming* (merge-pathnames "incoming/" *root*))
24(defparameter *dirs* (list *incoming*))
25
26;;;; A simple logging abstraction.
27
28(defconstant +month-names+ '("Jan" "Feb" "Mar" "Apr" "May" "Jun" 
29                             "Jul" "Aug" "Sep" "Oct" "Nov" "Dec"))
30
31(defparameter *log* *standard-output*)
32
33(defun format-time (universal-time)
34    (multiple-value-bind 
35          (second minute hour date month year day-of-week dst-p tz)
36        (decode-universal-time universal-time)
37      (format nil "~&~A ~A ~2,'0D:~2,'0D:~2,'0D" 
38              (nth (1- month) +month-names+) date hour minute second)))
39
40(defmacro log (message &rest parameters)
41  `(when *log*
42     (format *log* "~A " (format-time (get-universal-time)))
43     (format *log* ,message ,@parameters)
44     (format *log* "~&")))
45
46;;; Start a pool of hungry philosophers.
47(defun start-server () 
48  (when *server-running-p*
49    (error "Server not recorded as stopped."))
50  (unless 
51      (mapcar #'ensure-directories-exist *dirs*)
52    (error "Failed to create directories under '~A'." *root*))
53  (let ((logfile (merge-pathnames "abcl-threads.log" *logs*)))
54    (setf *log* 
55          (open logfile :direction :output :if-exists :append :if-does-not-exist :create))
56    (format *standard-output* "Logging to ~A." logfile))
57  (log "Starts.")
58  (schedule-threads)
59  (setf *server-running-p* t))
60
61(defun stop-server (&key (force nil))
62  (unless force
63    (unless *server-running-p*
64      (error "Server not recorded as running.")))
65  (log "Stopping the server.")
66  (dolist (future `(,*incoming-scheduled-future* ,*watch-queue-future* ,@*scheduled-futures*))
67    (when (not (or (#"isCancelled" future)
68                   (#"isDone" future)))
69      (#"cancel" future t)))
70  (#"shutdown" *thread-pool*)
71  (close *log*)
72  (setf *server-running-p* nil))
73
74(defun schedule-threads ()
75  (log "Starting thread pool.")
76  (when *thread-pool*
77    (log "Removing existing incoming thread pool."))
78  (setf *thread-pool*
79        (#"newScheduledThreadPool" 'java.util.concurrent.Executors 1))
80  (#"setExecuteExistingDelayedTasksAfterShutdownPolicy" *thread-pool* nil)
81  (initialize-queue)
82  (log "Scheduling queue watcher.")
83  (setf *watch-queue-future* 
84        (#"scheduleWithFixedDelay" 
85         *thread-pool*
86         (make-watch-queue) 10 1 +seconds+))
87  (log "Scheduling incoming watcher.")
88  (setf *incoming-scheduled-future*
89        (#"scheduleWithFixedDelay" 
90         *thread-pool*
91         (make-process-incoming) 1 1 +seconds+)))
92
Note: See TracBrowser for help on using the repository browser.