1 module dli.string_stream.input_string_stream;
2 
3 import core.sync.semaphore;
4 import std.exception;
5 import std..string;
6 
7 public enum char eof = '\x04';
8 
9 public shared class InputStringStream
10 {
11     private string _content = "";
12 
13     @property
14     public string content() const
15     out(s)
16     {
17         assert(s !is null);
18     }
19     body
20     {
21         return _content;
22     }
23 
24     private Semaphore linesAvailableSemaphore;
25 
26     public immutable string lineTerminator;
27     
28     public this(string content = "", string lineTerminator = "\n")
29     in
30     {
31         assert(content !is null);
32         assert(lineTerminator !is null);
33     }
34     body
35     {
36         linesAvailableSemaphore = cast(shared) new Semaphore();
37         this.lineTerminator = lineTerminator;
38 
39         append(content);
40     }
41 
42     public final void appendLine(string content)
43     in
44     {
45         assert(content !is null);
46     }
47     body
48     {
49         append(content ~ lineTerminator);
50     }
51 
52     public final void append(string content)
53     in
54     {
55         assert(content !is null);
56     }
57     body
58     {
59         // Determine number of lines that will become available to read after this append.
60         immutable size_t linesInContent = content.count(lineTerminator) + content.count(eof) * 2;
61 
62         synchronized(this)
63             _content ~= content;
64 
65         for (size_t i; i < linesInContent; i++)
66             (cast() linesAvailableSemaphore).notify();
67     }
68 
69     public string readln()
70     {
71         import std..string : indexOf;
72         import std.algorithm.comparison : min;
73 
74         (cast() linesAvailableSemaphore).wait();
75 
76         synchronized(this)
77         {
78             if(_content[0] == eof)
79             {
80                 _content = _content[1..$];
81                 return null;
82             }
83             else
84             {
85 
86                 immutable ptrdiff_t nextLineTerminatorPosition = _content.indexOf(lineTerminator);
87                 immutable ptrdiff_t nextEOFPosition = _content.indexOf(eof);
88                 immutable ptrdiff_t lineUpperLimit = 
89                     (nextEOFPosition != -1) && (nextLineTerminatorPosition != -1) ?
90                         min(nextEOFPosition, nextLineTerminatorPosition + lineTerminator.length) :
91                         (nextEOFPosition != -1) ?
92                             nextEOFPosition :
93                             nextLineTerminatorPosition + lineTerminator.length;
94                 // Extract line and remove it from _content
95                 string line = _content[0..lineUpperLimit];
96                 _content = _content[lineUpperLimit..$];
97                 return line;
98             }
99             
100         }
101     }
102 }
103 
104 // TESTS
105 version(unittest)
106 {
107     import unit_threaded : shouldEqual;
108     @("InputStringStream reads out contents one line at a time regardless of line terminator")
109     unittest
110     {
111         string line1 = "This is line 1";
112         string line2 = "This is line 2";
113 
114         string lineTerminator1 = "\n";
115 
116         auto stream1 = new shared InputStringStream(line1 ~ lineTerminator1 ~
117                                                     line2 ~ lineTerminator1, lineTerminator1);
118 
119         stream1.readln().shouldEqual(line1 ~ lineTerminator1);
120         stream1.readln().shouldEqual(line2 ~ lineTerminator1);
121 
122         string lineTerminator2 = "\n\r";
123 
124         auto stream2 = new shared InputStringStream(line1 ~ lineTerminator2 ~
125                                                     line2 ~ lineTerminator2, lineTerminator2);
126 
127         stream2.readln().shouldEqual(line1 ~ lineTerminator2);
128         stream2.readln().shouldEqual(line2 ~ lineTerminator2);
129     }
130 
131     @("InputStringStream blocks threads on readln until a line is available")
132     unittest
133     {
134         import core.thread : ThreadGroup;
135         import core.atomic : atomicOp;
136 
137 
138         auto stream = new shared InputStringStream();
139         immutable string sampleLine = "This is a sample line\n";
140 
141         ThreadGroup threads = new ThreadGroup();
142         enum lines = 20;
143         shared size_t linesRead;
144 
145         for(size_t i; i < lines; i++)
146         {
147             threads.create(
148                 {
149                     stream.readln().shouldEqual(sampleLine);
150                     linesRead.atomicOp!"+="(1);
151                 });
152 
153             threads.create(
154                 {
155                     stream.append(sampleLine);
156                 });
157         }
158 
159         threads.joinAll();
160 
161         linesRead.shouldEqual(lines);
162     }
163 
164     @("InputStringStream.readln() returns null if the next character is eof")
165     unittest
166     {
167         auto stream = new shared InputStringStream();
168         stream.append("" ~ eof ~ "asdf");
169         stream.readln().shouldEqual(null);
170     }
171 
172     @("InputStringStream.readln() returns content before eof, up to a line")
173     unittest
174     {
175         auto stream = new shared InputStringStream();
176         stream.append("abc" ~ eof);
177         stream.readln().shouldEqual("abc");
178         stream.readln().shouldEqual(null);
179 
180         stream.appendLine("def");
181         stream.append("" ~ eof);
182         stream.readln().shouldEqual("def" ~ stream.lineTerminator);
183         stream.readln().shouldEqual(null);
184 
185         stream.appendLine("abc");
186         stream.appendLine("def");
187         stream.append("" ~ eof);
188         assert(stream.readln() == "abc" ~ stream.lineTerminator);
189         assert(stream.readln() == "def" ~ stream.lineTerminator);
190         assert(stream.readln() is null);
191     }
192 }