1 module dli.string_stream.input_string_stream; 2 3 import core.sync.semaphore; 4 import std.exception; 5 import std..string; 6 7 public enum char eof = '\x04'; 8 9 public shared class InputStringStream 10 { 11 private string _content = ""; 12 13 @property 14 public string content() const 15 out(s) 16 { 17 assert(s !is null); 18 } 19 body 20 { 21 return _content; 22 } 23 24 private Semaphore linesAvailableSemaphore; 25 26 public immutable string lineTerminator; 27 28 public this(string content = "", string lineTerminator = "\n") 29 in 30 { 31 assert(content !is null); 32 assert(lineTerminator !is null); 33 } 34 body 35 { 36 linesAvailableSemaphore = cast(shared) new Semaphore(); 37 this.lineTerminator = lineTerminator; 38 39 append(content); 40 } 41 42 public final void appendLine(string content) 43 in 44 { 45 assert(content !is null); 46 } 47 body 48 { 49 append(content ~ lineTerminator); 50 } 51 52 public final void append(string content) 53 in 54 { 55 assert(content !is null); 56 } 57 body 58 { 59 // Determine number of lines that will become available to read after this append. 60 immutable size_t linesInContent = content.count(lineTerminator) + content.count(eof) * 2; 61 62 synchronized(this) 63 _content ~= content; 64 65 for (size_t i; i < linesInContent; i++) 66 (cast() linesAvailableSemaphore).notify(); 67 } 68 69 public string readln() 70 { 71 import std..string : indexOf; 72 import std.algorithm.comparison : min; 73 74 (cast() linesAvailableSemaphore).wait(); 75 76 synchronized(this) 77 { 78 if(_content[0] == eof) 79 { 80 _content = _content[1..$]; 81 return null; 82 } 83 else 84 { 85 86 immutable ptrdiff_t nextLineTerminatorPosition = _content.indexOf(lineTerminator); 87 immutable ptrdiff_t nextEOFPosition = _content.indexOf(eof); 88 immutable ptrdiff_t lineUpperLimit = 89 (nextEOFPosition != -1) && (nextLineTerminatorPosition != -1) ? 90 min(nextEOFPosition, nextLineTerminatorPosition + lineTerminator.length) : 91 (nextEOFPosition != -1) ? 92 nextEOFPosition : 93 nextLineTerminatorPosition + lineTerminator.length; 94 // Extract line and remove it from _content 95 string line = _content[0..lineUpperLimit]; 96 _content = _content[lineUpperLimit..$]; 97 return line; 98 } 99 100 } 101 } 102 } 103 104 // TESTS 105 version(unittest) 106 { 107 import unit_threaded : shouldEqual; 108 @("InputStringStream reads out contents one line at a time regardless of line terminator") 109 unittest 110 { 111 string line1 = "This is line 1"; 112 string line2 = "This is line 2"; 113 114 string lineTerminator1 = "\n"; 115 116 auto stream1 = new shared InputStringStream(line1 ~ lineTerminator1 ~ 117 line2 ~ lineTerminator1, lineTerminator1); 118 119 stream1.readln().shouldEqual(line1 ~ lineTerminator1); 120 stream1.readln().shouldEqual(line2 ~ lineTerminator1); 121 122 string lineTerminator2 = "\n\r"; 123 124 auto stream2 = new shared InputStringStream(line1 ~ lineTerminator2 ~ 125 line2 ~ lineTerminator2, lineTerminator2); 126 127 stream2.readln().shouldEqual(line1 ~ lineTerminator2); 128 stream2.readln().shouldEqual(line2 ~ lineTerminator2); 129 } 130 131 @("InputStringStream blocks threads on readln until a line is available") 132 unittest 133 { 134 import core.thread : ThreadGroup; 135 import core.atomic : atomicOp; 136 137 138 auto stream = new shared InputStringStream(); 139 immutable string sampleLine = "This is a sample line\n"; 140 141 ThreadGroup threads = new ThreadGroup(); 142 enum lines = 20; 143 shared size_t linesRead; 144 145 for(size_t i; i < lines; i++) 146 { 147 threads.create( 148 { 149 stream.readln().shouldEqual(sampleLine); 150 linesRead.atomicOp!"+="(1); 151 }); 152 153 threads.create( 154 { 155 stream.append(sampleLine); 156 }); 157 } 158 159 threads.joinAll(); 160 161 linesRead.shouldEqual(lines); 162 } 163 164 @("InputStringStream.readln() returns null if the next character is eof") 165 unittest 166 { 167 auto stream = new shared InputStringStream(); 168 stream.append("" ~ eof ~ "asdf"); 169 stream.readln().shouldEqual(null); 170 } 171 172 @("InputStringStream.readln() returns content before eof, up to a line") 173 unittest 174 { 175 auto stream = new shared InputStringStream(); 176 stream.append("abc" ~ eof); 177 stream.readln().shouldEqual("abc"); 178 stream.readln().shouldEqual(null); 179 180 stream.appendLine("def"); 181 stream.append("" ~ eof); 182 stream.readln().shouldEqual("def" ~ stream.lineTerminator); 183 stream.readln().shouldEqual(null); 184 185 stream.appendLine("abc"); 186 stream.appendLine("def"); 187 stream.append("" ~ eof); 188 assert(stream.readln() == "abc" ~ stream.lineTerminator); 189 assert(stream.readln() == "def" ~ stream.lineTerminator); 190 assert(stream.readln() is null); 191 } 192 }