001 package javax.realtime; 002 003 /** The wait-free queue facilitate communication and synchronization 004 * between instances of <code>RealtimeThread</code> and 005 * <code>java.lang.Thread</code>. The problem is that synchronized 006 * access objects shared between real-time threads and threads 007 * might cause the real-time threads to incur delays due to 008 * execution of the garbage collector. 009 * <p> 010 * The <code>write()</code> method of this class does not block on 011 * an imagined queue-full condition variable. If the <code>write()</code> 012 * method is called on a full queue false is returned. If two real-time 013 * threads intend to read from this queue they must provide their 014 * own synchronization. 015 * <p> 016 * The <code>read()</code> method of this queue is synchronized and 017 * may be called by more than one writer and will block on queue empty. 018 */ 019 public class WaitFreeWriteQueue { 020 021 protected Object[] writeQueue = null; 022 protected int queueSize; 023 protected int currentIndex = 0; 024 025 protected Thread writerThread = null, readerThread = null; 026 protected MemoryArea memArea; 027 028 /** A queue with an unsynchronized and nonblocking <code>write()</code> 029 * method and a synchronized and blocking <code>read()</code> method. 030 * 031 * @param writer An instance of <code>java.lang.Thread</code>. 032 * @param reader An instance of <code>java.lang.Thread</code>. 033 * @param maximum The maximum number of elements in the queue. 034 * @param memory The <code>MemoryArea</code> in which this object and 035 * internal elements are allocated. 036 * @throws java.lang.IllegalArgumentException If an argument holds an 037 * invalid value. The current 038 * memory areas of 039 * <code>writer</code>, 040 * <code>reader</code>, and 041 * <code>memory</code> must be 042 * compatible with respect to 043 * the assignment and access 044 * rules for memory areas. 045 */ 046 public WaitFreeWriteQueue(Thread writer, Thread reader, 047 int maximum, MemoryArea memory) 048 throws IllegalArgumentException { 049 writerThread = writer; 050 readerThread = reader; 051 queueSize = maximum; 052 memArea = memory; 053 054 // TODO (?) 055 } 056 057 /** Set <code>this</code> to empty. */ 058 public void clear() { 059 currentIndex = 0; 060 } 061 062 /** Force this <code>java.lang.Object</code> to replace the last one. 063 * If the reader should happen to have just removed the other 064 * <code>java.lang.Object</code> just as we were updating it, we will 065 * return false. False may mean that it just saw that we put in there. 066 * Either way, the best thing to do is to just write again -- which 067 * will succeed, and check on the readers side for consecutive 068 * identical read values. 069 * 070 * @return True, if an element was overwritten. False, if there is an 071 * empty element into which the write occured. 072 * @throws MemoryScopeException 073 */ 074 public boolean force(Object object) throws MemoryScopeException { 075 if (!isFull()) return write(object); 076 else { 077 writeQueue[currentIndex] = object; 078 return true; 079 } 080 } 081 082 /** Queries the system to determine if <code>this</code> is empty. 083 * 084 * @return True, if <code>this</code> is empty. False, if 085 * <code>this</code> is not empty. 086 */ 087 public boolean isEmpty() { 088 return (currentIndex == 0); 089 } 090 091 /** Queries the system to determine if <code>this</code> is full. 092 * 093 * @return True, if <code>this</code> is full. False, if 094 * <code>this</code> is not full. 095 */ 096 public boolean isFull() { 097 return (currentIndex == queueSize - 1); 098 } 099 100 /** A synchronized read on the queue. 101 * 102 * @return The <code>java.lang.Object</code> read or null if 103 * <code>this</code> is empty. 104 */ 105 public synchronized Object read() { 106 while (isEmpty()) 107 try { 108 Thread.sleep(100); 109 } catch (Exception e) {}; 110 111 Object temp = writeQueue[0]; 112 for (int i = 0; i < currentIndex; i++) 113 writeQueue[i] = writeQueue[i+1]; 114 currentIndex--; 115 return temp; 116 } 117 118 /** Queries the system to determine the number of elements in 119 * <code>this</code>. 120 * 121 * @return An integer which is the number of non-empty positions in 122 * <code>this</code>. 123 */ 124 public int size() { 125 return currentIndex; 126 } 127 128 /** Attempt to insert an element into the queue. 129 * 130 * @param object The <code>java.lang.Object</code> to insert. 131 * @return True, if the write succeeded. False, if not. 132 * @throws MemoryScopeException 133 */ 134 public boolean write(Object object) throws MemoryScopeException { 135 if (isFull()) return false; 136 else { 137 writeQueue[++currentIndex] = object; 138 return true; 139 } 140 } 141 }