SingleThread.h
1 //-*-C++-*-
2 /***************************************************************************
3  *
4  * Copyright (C) 2011 by Willem van Straten
5  * Licensed under the Academic Free License version 2.1
6  *
7  ***************************************************************************/
8 
9 // dspsr/Signal/General/dsp/SingleThread.h
10 
11 #ifndef __dspsr_SingleThread_h
12 #define __dspsr_SingleThread_h
13 
14 #include "dsp/Pipeline.h"
15 #include "CommandLine.h"
16 #include "Functor.h"
17 #include "TextEditor.h"
18 #include "MJD.h"
19 
20 class ThreadContext;
21 
22 namespace dsp {
23 
24  class Source;
25  class TimeSeries;
26  class Operation;
27  class Observation;
28  class Scratch;
29  class Memory;
30 
32  class SingleThread : public Pipeline
33  {
34 
35  public:
36 
38  friend class MultiThread;
39 
41  class Config;
42 
44  SingleThread ();
45 
47  ~SingleThread ();
48 
50  void set_configuration (Config*);
51 
53  void set_source (Source*);
54 
56  Source* get_source ();
57 
59  void initialize ();
60 
62  void construct ();
63 
65  void prepare ();
66 
68  void seek_epoch (const MJD&);
69 
71  void run ();
72 
74  virtual void share (SingleThread*);
75 
77  virtual void combine (const SingleThread*);
78 
80  void finish ();
81 
83  uint64_t get_minimum_samples () const;
84 
86  std::ostream cerr;
87 
89  void take_ostream (std::ostream* newlog);
90 
91  unsigned thread_id;
92  void set_affinity (int core);
93 
95  std::vector< Reference::To<Operation> > const get_operations () { return operations; };
96 
98  /*
99  returns the estimated number of bytes required to store data in the pipeline
100  */
101  uint64_t set_block_size (uint64_t minimum_samples, uint64_t input_overlap = 0);
102 
103  protected:
104 
106  virtual void end_of_data ();
107 
109  virtual void prepare (Source*);
110 
112  void configure_overlap (Source*, Memory*);
113 
115  std::ostream* log;
116 
118  enum State
119  {
120  Fail,
126  Run,
129  };
130 
133 
136 
139 
142 
145 
148 
151 
154 
157  TimeSeries* new_TimeSeries () { return new_time_series(); }
158 
160  std::vector< Reference::To<Operation> > operations;
161 
163  void insert_dump_point (const std::string& transformation_name);
164 
166  void insert_mask (const std::string& transformation_name);
167 
170 
172  uint64_t minimum_samples;
173 
174  Reference::To<Memory> device_memory;
175  void* gpu_stream;
176  int gpu_device;
177 
179  const std::string& app();
180  };
181 
182  class Input;
183 
186  {
187  public:
188 
190  Config ();
191 
193  virtual void add_options (CommandLine::Menu&);
194 
196  Source* open (int argc, char** argv);
197 
198  // set block size to this factor times the minimum possible
199  void set_times_minimum_ndat (unsigned);
200  unsigned get_times_minimum_ndat () const { return times_minimum_ndat; }
201 
202  // set block_size to result in approximately this much RAM usage
203  void set_maximum_RAM (uint64_t);
204  uint64_t get_maximum_RAM () const { return maximum_RAM; }
205 
206  // set block_size to result in at least this much RAM usage
207  void set_minimum_RAM (uint64_t);
208  uint64_t get_minimum_RAM () const { return minimum_RAM; }
209 
211  virtual void prepare (Source*);
212 
214  std::string application_name;
215 
218 
219  // Input files represent a single continuous observation
220  bool force_contiguity;
221 
222  // Command line values are header params, not file names
223  bool command_line_header;
224 
226 
228 
229  // number of seconds to seek into data
230  double seek_seconds;
231 
232  // number of seconds to process from data
233  double total_seconds;
234 
237 
240 
243 
246 
249 
251  void set_cuda_device (std::string);
252  unsigned get_cuda_ndevice () const { return cuda_device.size(); }
253 
255  void set_nthread (unsigned);
256 
258  unsigned get_total_nthread () const;
259 
261  void set_affinity (std::string);
262 
264  void set_fft_library (std::string);
265 
268 
271 
273  std::vector<std::string> dump_before;
274 
276  std::vector<std::string> mask_before;
277 
279  unsigned get_nbuffers () const { return buffers; }
280 
282  virtual void set_quiet ();
283 
285  virtual void set_verbose ();
286 
288  virtual void set_very_verbose ();
289 
290  protected:
291 
293  friend class SingleThread;
294 
296  bool can_cuda;
297 
299  std::vector<unsigned> cuda_device;
300 
303 
305  std::vector<unsigned> affinity;
306 
308  unsigned nthread;
309 
311  unsigned buffers;
312 
314  unsigned repeated;
315 
316  // set block size to this factor times the minimum possible
317  unsigned times_minimum_ndat;
318 
319  // set block size to result in approximately this much RAM usage
320  uint64_t maximum_RAM;
321 
322  // set block size to result in at least this much RAM usage
323  uint64_t minimum_RAM;
324  };
325 
326 }
327 
328 #endif // !defined(__SingleThread_h)
void set_fft_library(std::string)
set the FFT library
Definition: SingleThread.C:1051
void prepare()
Prepare the signal processing pipeline.
Definition: SingleThread.C:316
bool can_cuda
application can make use of CUDA
Definition: SingleThread.h:296
Error error
Error status.
Definition: SingleThread.h:135
unsigned get_total_nthread() const
get the total number of threads
Definition: SingleThread.C:890
unsigned repeated
number of times that the input has been re-opened
Definition: SingleThread.h:314
bool list_attributes
List all editor-accessible attributes of the observation.
Definition: SingleThread.h:236
void insert_dump_point(const std::string &transformation_name)
Insert a dump point before the named operation.
Definition: SingleThread.C:469
Produces TimeSeries data by integrating an Input with an Unpacker.
Definition: IOManager.h:26
virtual void set_verbose()
Operate in verbose mode.
Definition: SingleThread.C:1038
@ Run
preparations completed
Definition: SingleThread.h:126
bool can_thread
application can make use of multiple cores
Definition: SingleThread.h:302
Pure virtual base class of all objects that can load BitSeries data.
Definition: Input.h:31
@ Construct
nothing happening
Definition: SingleThread.h:122
Contains all Baseband Data Reduction Library classes.
Definition: ASCIIObservation.h:17
std::string get_library_name(unsigned i)
dsp::Source * create(const std::string &descriptor)
Construct a new child of Source based on the descriptor.
Definition: SourceFactory.C:21
void configure_overlap(Source *, Memory *)
Configure overlap memory, if possible.
Definition: SingleThread.C:454
Source * get_source()
Get the Source from which data are obtained.
Definition: SingleThread.C:152
virtual void combine(const SingleThread *)
Combine the results from another processing thread.
Definition: SingleThread.C:678
Reference::To< Scratch > scratch
The scratch space shared by all operations.
Definition: SingleThread.h:169
State
Processing thread states.
Definition: SingleThread.h:118
MJD seek_epoch
Seek such that the first time sample output is at the specified (topocentric) MJD.
Definition: SingleThread.h:227
void set_verbosity(unsigned level)
Set the verbosity level of various base classes.
Definition: dsp_verbosity.C:13
std::vector< Reference::To< Operation > > operations
The operations to be performed.
Definition: SingleThread.h:160
ErrorCode get_code() const
Creates new Source objects.
Definition: SourceFactory.h:22
virtual const Observation * get_info() const =0
Get the const Observation attributes that describe the source.
std::string application_name
The name of the application.
Definition: SingleThread.h:214
@ Constructed
request to construct
Definition: SingleThread.h:123
bool weighted_time_series
use weighted time series to flag bad data
Definition: SingleThread.h:270
Per-thread configuration options.
Definition: SingleThread.h:185
void insert_mask(const std::string &transformation_name)
Insert a mask before the named operation.
Definition: SingleThread.C:502
bool run_repeatedly
run repeatedly on the same input
Definition: SingleThread.h:248
Reference::To< TimeSeries > source_output
The TimeSeries into which the Source outputs data.
Definition: SingleThread.h:150
Multiple pipeline threads.
Definition: MultiThread.h:21
virtual void set_quiet()
Operate in quiet mode.
Definition: SingleThread.C:1031
void set_output(FILE *)
Set the output stream to which data will be dumped.
Definition: Dump.C:28
~SingleThread()
Destructor.
Definition: SingleThread.C:67
@ Prepared
request to prepare
Definition: SingleThread.h:125
Functor< void(Source *) > source_prepare
external function used to prepare the input each time it is opened
Definition: SingleThread.h:217
const std::string & app()
The name of the application running this thread.
Definition: SingleThread.C:538
Manages CUDA device memory allocation and destruction.
Definition: MemoryCUDA.h:32
virtual void add_options(CommandLine::Menu &)
Add command line options.
Definition: SingleThread.C:944
uint64_t get_minimum_samples() const
Get the minimum number of samples required to process.
Definition: SingleThread.C:540
Type * release()
static bool record_time
Global flag enables stopwatch to record the time spent operating.
Definition: Operation.h:42
Type * get() const
Scratch space that can be shared between Operations.
Definition: Scratch.h:27
uint64_t minimum_samples
The minimum number of samples required to process.
Definition: SingleThread.h:172
std::string get_name() const
Return the unique name of this operation.
Definition: Operation.h:95
void initialize()
Initialize resources required by signal procesing pipline.
Definition: SingleThread.C:104
Defines the interface by which operations are performed on data.
Definition: Operation.h:37
static bool auto_delete
Automatically delete arrays on resize(0)
Definition: TimeSeries.h:50
void take_ostream(std::ostream *newlog)
Take and manage a new ostream instance.
Definition: SingleThread.C:83
Container of weighted time-major order floating point data.
Definition: WeightedTimeSeries.h:26
Stores information about digital, band-limited, time-varying signals.
Definition: Observation.h:33
TextEditor< Observation > editor
The editor used to set Observation attributes via the command line.
Definition: SingleThread.h:239
virtual void open(const std::vector< std::string > &new_filenames)
Open a number of files and treat them as one logical observation composed of multiple parallel stream...
Definition: MultiFile.C:95
static bool report_time
Global flag enables report of time spent in operation on descruction.
Definition: Operation.h:45
void construct()
Build the signal processing pipeline.
Definition: SingleThread.C:230
void set_nthread(unsigned)
set the number of CPU threads to be used
Definition: SingleThread.C:884
virtual void set_input(const In *_input)
Set the container from which input data will be read.
Definition: HasInput.h:38
Source * open(int argc, char **argv)
Create new Input based on command line options.
Definition: SingleThread.C:806
bool input_buffering
use input-buffering to compensate for operation edge effects
Definition: SingleThread.h:267
Arrays of consecutive samples for each polarization and frequency channel.
Definition: TimeSeries.h:29
Abstract base class of data reduction pipelines.
Definition: Pipeline.h:28
ThreadContext * source_context
Mutex protecting source.
Definition: SingleThread.h:141
unsigned nthread
number of CPU threads
Definition: SingleThread.h:308
bool report_vitals
report vital statistics
Definition: SingleThread.h:242
virtual void share(SingleThread *)
Share any necessary resources with the specified thread.
Definition: SingleThread.C:175
@ Idle
an error has occurred
Definition: SingleThread.h:121
Allow ascii_header-style params to be entered on the command line.
Definition: CommandLineHeader.h:22
std::vector< std::string > mask_before
mask points
Definition: SingleThread.h:276
virtual void seek_time(double second)=0
Seek to the specified time in seconds.
SingleThread()
Constructor.
Definition: SingleThread.C:50
unsigned get_num_libraries()
Dumps TimeSeries data to file in either binary or ascii format.
Definition: Dump.h:27
std::string convert(int argc, char **argv, std::string filename="")
Parse the given argc, argv into a header file, return file name.
Definition: CommandLineHeader.C:15
static bool verbose
Global verbosity flag.
Definition: Operation.h:48
virtual void add(Item *)
std::string get_library()
TimeSeries * new_time_series()
Create a new TimeSeries instance.
Definition: SingleThread.C:212
void seek_epoch(const MJD &)
Seek such that output starts at the specified epoch.
Definition: SingleThread.C:336
uint64_t set_block_size(uint64_t minimum_samples, uint64_t input_overlap=0)
Set the number of time samples per block, based on RAM constraints.
Definition: SingleThread.C:358
virtual void end_of_data()
Any special operations that must be performed at the end of data.
Definition: SingleThread.C:762
void force_contiguity()
Treat the files as contiguous.
Definition: SerialFiles.C:30
void set_output_binary(bool flag=true)
Set the flag to output binary data.
Definition: Dump.C:35
void set_source(Source *)
Set the Source from which data are obtained.
Definition: SingleThread.C:144
Pure virtual base class of sources that can seek through data.
Definition: Seekable.h:29
SingleThread * colleague
Processing thread with whom sharing will occur.
Definition: SingleThread.h:144
@ Procedural
operations that produce data
Definition: Operation.h:116
Config()
Default constructor.
Definition: SingleThread.C:767
ThreadContext * state_change
State change communication.
Definition: SingleThread.h:138
static Mask * factory(const std::string &descriptor)
Factory creates a new Mask-derived transformation object.
Definition: Mask.C:115
Reference::To< Config > config
Configuration information.
Definition: SingleThread.h:153
Share * clone(HasInput< TimeSeries > *)
Sort-of clone.
Definition: InputBufferingShare.C:43
virtual void set_very_verbose()
Operate in very verbose mode.
Definition: SingleThread.C:1043
Loads serial BitSeries data from multiple files.
Definition: SerialFiles.h:24
const Type * ptr() const
Reference::To< Source > source
Manages loading and unpacking.
Definition: SingleThread.h:147
State state
Processing state.
Definition: SingleThread.h:132
void finish()
Finish everything.
Definition: SingleThread.C:751
bool report_done
report the percentage finished
Definition: SingleThread.h:245
std::vector< unsigned > affinity
CPUs on which threads will run.
Definition: SingleThread.h:305
void set_cuda_device(std::string)
set the cuda devices to be used
Definition: SingleThread.C:901
unsigned get_nbuffers() const
get the number of buffers required to process the data
Definition: SingleThread.h:279
virtual void set_total_time(double second)=0
Truncate the time series at the specified second.
std::ostream * log
Pointer to the ostream.
Definition: SingleThread.h:115
A single Pipeline thread.
Definition: SingleThread.h:32
@ Joined
processing completed
Definition: SingleThread.h:128
void set_library(const std::string &name)
void run()
Run through the data.
Definition: SingleThread.C:546
std::vector< std::string > dump_before
dump points
Definition: SingleThread.h:273
MJD get_start_time() const
Return the start time of the leading edge of the first time sample.
Definition: Observation.h:152
Abstract interface to sources of TimeSeries data.
Definition: Source.h:22
Pure virtual base class of objects that manage memory allocation and destruction.
Definition: Memory.h:23
@ Prepare
construction completed
Definition: SingleThread.h:124
@ Done
processing started
Definition: SingleThread.h:127
virtual void prepare(Source *)
Prepare the input according to the configuration.
Definition: SingleThread.C:858
std::vector< unsigned > cuda_device
CUDA devices on which computations will take place.
Definition: SingleThread.h:299
void set_affinity(std::string)
set the cpus on which each thread will run
Definition: SingleThread.C:911
Buffers the Transformation input.
Definition: InputBufferingShare.h:19
void set_configuration(Config *)
Set the configuration.
Definition: SingleThread.C:71
std::ostream cerr
The verbose output stream shared by all operations.
Definition: SingleThread.h:86
void set_overlap_buffer_memory(Memory *memory)
Set the memory type used in the overlap buffer.
Definition: Seekable.C:328
const std::vector< Reference::To< Operation > > get_operations()
get the operations being performed
Definition: SingleThread.h:95
unsigned buffers
number of buffers that have been created by new_time_series
Definition: SingleThread.h:311

Generated using doxygen 1.8.17