001    /*
002     * Copyright 2005 Stephen J. McConnell.
003     *
004     * Licensed  under the  Apache License,  Version 2.0  (the "License");
005     * you may not use  this file  except in  compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     *   http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed  under the  License is distributed on an "AS IS" BASIS,
012     * WITHOUT  WARRANTIES OR CONDITIONS  OF ANY KIND, either  express  or
013     * implied.
014     *
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    
019    package net.dpml.station.server; 
020    
021    import java.rmi.AlreadyBoundException;
022    import java.rmi.RemoteException;
023    import java.rmi.server.UnicastRemoteObject;
024    import java.rmi.registry.LocateRegistry;
025    import java.rmi.registry.Registry;
026    import java.net.URL;
027    import java.util.Map;
028    import java.util.Hashtable;
029    import java.util.LinkedList;
030    import java.util.List;
031    import java.util.EventObject;
032    
033    import net.dpml.station.Application;
034    import net.dpml.station.Callback;
035    import net.dpml.station.Manager;
036    import net.dpml.station.Station;
037    import net.dpml.station.StationException;
038    
039    import net.dpml.station.info.ApplicationDescriptor;
040    import net.dpml.station.info.StartupPolicy;
041    import net.dpml.station.ApplicationRegistry;
042    
043    import net.dpml.util.Logger;
044    import net.dpml.transit.model.TransitModel;
045    import net.dpml.transit.Disposable;
046    
047    import net.dpml.lang.UnknownKeyException;
048    
049    /**
050     * The RemoteStation is responsible for the establishment of 
051     * callback monitors to external processes established by the 
052     * station manager.
053     * @author <a href="http://www.dpml.net">Digital Product Meta Library</a>
054     * @version 1.2.0
055     */
056    public class RemoteStation extends UnicastRemoteObject implements Station, Manager
057    {
058        private final RemoteApplicationRegistry m_registry;
059        private final Map m_applications = new Hashtable();
060        private final Logger m_logger;
061        private final int m_port;
062        private final Registry m_rmiRegistry;
063        private final URL m_store;
064        private final TransitModel m_model;
065        private final LoggingServer m_server;
066        private final Thread m_thread;
067        
068        private boolean m_terminated = false;
069        
070       /**
071        * Creation of a station instance.
072        *
073        * @param logger the assigned logging channel
074        * @param model the transit model
075        * @param port the station port 
076        * @param registryStorageUrl uri defining the registry backing store
077        * @exception Exception if a exception occurs during establishment
078        */
079        public RemoteStation( 
080          Logger logger, TransitModel model, int port, URL registryStorageUrl ) 
081          throws Exception
082        {
083            super();
084            
085            m_logger = logger;
086            m_port = port;
087            m_store = registryStorageUrl;
088            m_model = model;
089            
090            m_rmiRegistry = getLocalRegistry( port );
091            
092            try
093            {
094                m_rmiRegistry.bind( STATION_KEY, this );
095            }
096            catch( AlreadyBoundException e )
097            {
098                final String error =
099                 "An instance of the Station is already bound to port " + port;
100                throw new StationException( error, e );
101            }
102    
103            setShutdownHook( this );
104            startEventDispatchThread();
105    
106            try
107            {
108                m_server = new LoggingServer( 2020 );
109                m_thread = new Thread( m_server, "DPML Station Logging Server" );
110                m_thread.start();
111            }
112            catch( Exception e )
113            {
114                final String error =
115                 "Unexpected error while attempting to start the logging server on port " + 2020;
116                throw new StationException( error, e );
117            }
118            
119            if( getLogger().isDebugEnabled() )
120            {
121                if( null == registryStorageUrl )
122                {
123                    getLogger().debug( "loading registry from default storage" );
124                }
125                else
126                {
127                    getLogger().debug( "loading registry from [" + registryStorageUrl + "]" );
128                }
129            }
130            
131            m_registry = new RemoteApplicationRegistry( logger, registryStorageUrl );
132            String[] keys = m_registry.getKeys();
133            
134            if( getLogger().isDebugEnabled() )
135            {
136                getLogger().debug( "registry established (" + keys.length + ")" );
137            }
138            
139            for( int i=0; i<keys.length; i++ )
140            {
141                String key = keys[i];
142                try
143                {
144                    ApplicationDescriptor descriptor = 
145                      m_registry.getApplicationDescriptor( key );
146                    if( StartupPolicy.AUTOMATIC.equals( descriptor.getStartupPolicy() ) )
147                    {
148                        RemoteApplication application = getRemoteApplication( key );
149                        application.start();
150                    }
151                }
152                catch( UnknownKeyException e )
153                {
154                    throw new RuntimeException( e ); // will not happen
155                }
156            }
157        }
158        
159       /**
160        * Return a string containing info about the general setup of the station.
161        * @return station configuration info
162        */
163        public String[] getInfo()
164        {
165            String[] values = new String[4];
166            values[0] = "Port: " + m_port;
167            values[1] = "Store: " + m_store;
168            values[2] = "Basedir: " + System.getProperty( "user.dir" );
169            values[3] = "Codebase: " 
170              + getClass().getProtectionDomain().getCodeSource().getLocation();
171            return values;
172        }
173        
174       /**
175        * Return an callback handler for the supplied id.
176        * @param id the callback id
177        * @return the callback handler
178        * @exception UnknownKeyException if the id is unknown
179        * @exception RemoteException if a remote error occurs
180        */
181        public Callback getCallback( String id ) throws UnknownKeyException, RemoteException
182        {
183            // TODO: improve this so that this is only called once per appliation
184            return getRemoteApplication( id );
185        }
186        
187       /**
188        * Shutdown the station.
189        */
190        public void shutdown()
191        {
192            shutdown( true );
193        }
194        
195       /**
196        * Shutdown the station.
197        * @param exit if true launch a process termination
198        */
199        private void shutdown( boolean exit )
200        {
201            synchronized( m_applications )
202            {
203                if( m_terminated )
204                {
205                    return;
206                }
207                else
208                {
209                    m_terminated = true;
210                }
211                
212                if( getLogger().isInfoEnabled() )
213                {
214                    getLogger().info( "initiating station shutdown" );
215                }
216                
217                try
218                {
219                    m_rmiRegistry.unbind( STATION_KEY );
220                }
221                catch( Exception e )
222                {
223                    // ignore
224                }
225                try
226                {
227                    RemoteApplication[] applications = getRemoteApplications();
228                    for( int i=0; i<applications.length; i++ )
229                    {
230                        RemoteApplication application = applications[i];
231                        application.shutdown();
232                        UnicastRemoteObject.unexportObject( application, true );
233                    }
234                    UnicastRemoteObject.unexportObject( m_registry, true );
235                }
236                catch( Exception e )
237                {
238                    // ignore
239                }
240                try
241                {
242                    int n =  m_server.getErrorCount();
243                    if( n > 0 )
244                    {
245                        getLogger().warn( "logging issues: " + n );
246                    }
247                    m_thread.interrupt();
248                }
249                catch( Exception e )
250                {
251                    // ignore
252                }
253                
254                finally
255                {
256                    if( getLogger().isInfoEnabled() )
257                    {
258                        getLogger().info( "station shutdown complete" );
259                    }
260                    
261                    if( exit )
262                    {
263                        if( m_model instanceof Disposable )
264                        {
265                            try
266                            {
267                                Disposable disposable = (Disposable) m_model;
268                                disposable.dispose();
269                            }
270                            catch( Exception e )
271                            {
272                                // ignore
273                            }
274                        }
275                        
276                        if( getLogger().isDebugEnabled() )
277                        {
278                            getLogger().debug( "terminating process" );
279                        }
280                        
281                        Thread thread = new Thread(
282                          new Runnable()
283                          {
284                            public void run()
285                            {
286                                RemoteStation.m_DISPATCH.dispose();
287                                System.exit( 0 );
288                            }
289                          }
290                        );
291                        thread.start();
292                    }
293                }
294            }
295        }
296        
297       /**
298        * Return the application registry.
299        * @return the registry
300        */
301        public ApplicationRegistry getApplicationRegistry()
302        {
303            return m_registry;
304        }
305    
306       /**
307        * Return an application reference for the supplied key.
308        * @param key the application key
309        * @return the application
310        * @exception UnknownKeyException if the key is unknown
311        * @exception RemoteException if a remote error occurs
312        */
313        public Application getApplication( String key ) throws UnknownKeyException, RemoteException
314        {
315            return getRemoteApplication( key );
316        }
317        
318       /**
319        * Return an application reference for the supplied key.
320        * @param key the application key
321        * @return the application
322        * @exception UnknownKeyException if the key is unknown
323        * @exception RemoteException if a remote error occurs
324        */
325        RemoteApplication getRemoteApplication( String key ) throws UnknownKeyException, RemoteException
326        {
327            synchronized( m_applications )
328            {
329                if( m_applications.containsKey( key ) )
330                {
331                    return (RemoteApplication) m_applications.get( key );
332                }
333                else
334                {
335                    Logger logger = getLogger().getChildLogger( key );
336                    ApplicationDescriptor descriptor = m_registry.getApplicationDescriptor( key );
337                    RemoteApplication application = 
338                      new RemoteApplication( logger, descriptor, key, m_port );
339                    m_applications.put( key, application );
340                    return application;
341                }
342            }
343        }
344    
345       /**
346        * Return an array of all remote applications.
347        * @return the applications array
348        */
349        RemoteApplication[] getRemoteApplications()
350        {
351            synchronized( m_applications )
352            {
353                return (RemoteApplication[]) m_applications.values().toArray( new RemoteApplication[0] );
354            }
355        }
356        
357        private Logger getLogger()
358        {
359            return m_logger;
360        }
361        
362        private Registry getLocalRegistry( int port ) throws RemoteException
363        {
364            try
365            {
366                Registry registry = LocateRegistry.createRegistry( port );
367                getLogger().debug( "created local registry on port " + port );
368                return registry;
369            }
370            catch( RemoteException e )
371            {
372                Registry registry = LocateRegistry.getRegistry( port );
373                getLogger().debug( "using local registry on port " + port );
374                return registry;
375            }
376        }
377    
378        /**
379         * Queue of pending notification events.  When an event for which 
380         * there are one or more listeners occurs, it is placed on this queue 
381         * and the queue is notified.  A background thread waits on this queue 
382         * and delivers the events.  This decouples event delivery from 
383         * the application concern, greatly simplifying locking and reducing 
384         * opportunity for deadlock.
385         */
386        private static final List EVENT_QUEUE = new LinkedList();
387    
388       /**
389        * Enqueue an event for delivery to registered
390        * listeners unless there are no registered
391        * listeners.
392        * @param event the event to enqueue
393        */
394        static void enqueueEvent( EventObject event )
395        {
396            synchronized( EVENT_QUEUE ) 
397            {
398                EVENT_QUEUE.add( event );
399                EVENT_QUEUE.notify();
400            }
401        }
402        
403        /**
404         * A single background thread ("the event notification thread") monitors
405         * the event queue and delivers events that are placed on the queue.
406         */
407        private static class EventDispatchThread extends Thread 
408        {
409            private final Logger m_logger;
410            
411            private boolean m_continue = true;
412            
413            EventDispatchThread( Logger logger )
414            {
415                super( "DPML Station Event Dispatch" );
416                m_logger = logger;
417                m_logger.debug( "starting event dispatch thread" );
418            }
419            
420            void dispose()
421            {
422                synchronized( EVENT_QUEUE )
423                {
424                    m_logger.debug( "stopping event dispatch thread" );
425                    m_continue = false;
426                    EVENT_QUEUE.notify();
427                }
428            }
429            
430            public void run() 
431            {
432                while( m_continue ) 
433                {
434                    // Wait on EVENT_QUEUE till an event is present
435                    EventObject event = null;
436                    synchronized( EVENT_QUEUE ) 
437                    {
438                        try
439                        {
440                            while( EVENT_QUEUE.isEmpty() )
441                            {
442                                EVENT_QUEUE.wait();
443                            }
444                            Object object = EVENT_QUEUE.remove( 0 );
445                            try
446                            {
447                                event = (EventObject) object;
448                            }
449                            catch( ClassCastException cce )
450                            {
451                                final String error = 
452                                  "Unexpected class cast exception while processing an event." 
453                                  + "\nEvent: " + object;
454                                throw new IllegalStateException( error );
455                            }
456                        }
457                        catch( InterruptedException e )
458                        {
459                            return;
460                        }
461                    }
462                    
463                    Object source = event.getSource();
464                    if( source instanceof UnicastEventSource )
465                    {
466                        UnicastEventSource producer = (UnicastEventSource) source;
467                        try
468                        {
469                            producer.processEvent( event );
470                        }
471                        catch( Throwable e )
472                        {
473                            final String error = 
474                              "Unexpected error while processing event."
475                              + "\nEvent: " + event
476                              + "\nSource: " + source;
477                            m_logger.warn( error, e );
478                        }
479                    }
480                    else
481                    {
482                        final String error = 
483                          "Event source [" 
484                          + source.getClass().getName()
485                          + "] is not an instance of " + UnicastEventSource.class.getName();
486                        throw new IllegalStateException( error );
487                    }
488                }
489                
490                m_logger.info( "Controller event queue terminating." );
491            }
492        }
493    
494        private static EventDispatchThread m_DISPATCH = null;
495    
496        /**
497         * This method starts the event dispatch thread the first time it
498         * is called.  The event dispatch thread will be started only
499         * if someone registers a listener.
500         */
501        private synchronized void startEventDispatchThread()
502        {
503            if( m_DISPATCH == null )
504            {
505                Logger logger = getLogger();
506                m_DISPATCH = new EventDispatchThread( logger );
507                m_DISPATCH.setDaemon( true );
508                m_DISPATCH.start();
509            }
510        }
511            
512       /**
513        * Create a shutdown hook that will trigger shutdown of the supplied plugin.
514        * @param station the station
515        */
516        public static void setShutdownHook( final RemoteStation station )
517        {
518            //
519            // Create a shutdown hook to trigger clean disposal of the
520            // controller
521            //
522            
523            Runtime.getRuntime().addShutdownHook(
524              new Thread()
525              {
526                  public void run()
527                  {
528                      try
529                      {
530                          station.shutdown();
531                      }
532                      catch( Throwable e )
533                      {
534                          System.err.println( e.toString() );
535                      }
536                      System.runFinalization();
537                  }
538              }
539            );
540        }
541        
542    }