This commit is contained in:
Bob Farrell 2024-05-10 19:42:26 +01:00
parent a155e33121
commit 997ac4ec50
2 changed files with 48 additions and 20 deletions

View File

@ -134,7 +134,7 @@ pub fn deinit(self: *Request) void {
if (self.processed) self.allocator.free(self.body); if (self.processed) self.allocator.free(self.body);
} }
/// Process request, read body if present, parse headers (TODO) /// Process request, read body if present, parse headers
pub fn process(self: *Request) !void { pub fn process(self: *Request) !void {
var cookie: ?[]const u8 = null; var cookie: ?[]const u8 = null;

View File

@ -1,6 +1,8 @@
const std = @import("std"); const std = @import("std");
const List = std.DoublyLinkedList([]const u8); const List = std.DoublyLinkedList(*[buffer_size:0]u8);
const buffer_size = 4096;
allocator: std.mem.Allocator, allocator: std.mem.Allocator,
list: *List, list: *List,
read_write_mutex: *std.Thread.Mutex, read_write_mutex: *std.Thread.Mutex,
@ -8,14 +10,15 @@ condition: *std.Thread.Condition,
condition_mutex: *std.Thread.Mutex, condition_mutex: *std.Thread.Mutex,
writer: *Writer = undefined, writer: *Writer = undefined,
reader: *Reader = undefined, reader: *Reader = undefined,
pool: std.ArrayList(*List.Node), node_pool: std.ArrayList(*List.Node),
buffer_pool: std.ArrayList(*[buffer_size:0]u8),
position: usize, position: usize,
is_tty: bool = undefined, is_tty: bool = undefined,
const LogQueue = @This(); const LogQueue = @This();
pub fn init(allocator: std.mem.Allocator) !LogQueue { pub fn init(allocator: std.mem.Allocator) !LogQueue {
const list = try allocator.create(std.DoublyLinkedList([]const u8)); const list = try allocator.create(std.DoublyLinkedList(*[buffer_size:0]u8));
list.* = .{}; list.* = .{};
return .{ return .{
@ -24,14 +27,15 @@ pub fn init(allocator: std.mem.Allocator) !LogQueue {
.condition = try allocator.create(std.Thread.Condition), .condition = try allocator.create(std.Thread.Condition),
.condition_mutex = try allocator.create(std.Thread.Mutex), .condition_mutex = try allocator.create(std.Thread.Mutex),
.read_write_mutex = try allocator.create(std.Thread.Mutex), .read_write_mutex = try allocator.create(std.Thread.Mutex),
.pool = std.ArrayList(*List.Node).init(allocator), .node_pool = std.ArrayList(*List.Node).init(allocator),
.buffer_pool = std.ArrayList(*[buffer_size:0]u8).init(allocator),
.position = 0, .position = 0,
}; };
} }
pub fn setFile(self: *LogQueue, file: std.fs.File) !void { pub fn setFile(self: *LogQueue, file: std.fs.File) !void {
self.writer = try self.allocator.create(Writer); self.writer = try self.allocator.create(Writer);
self.writer.* = Writer{ .file = file, .queue = self }; self.writer.* = Writer{ .file = file, .queue = self, .mutex = try self.allocator.create(std.Thread.Mutex) };
self.reader = try self.allocator.create(Reader); self.reader = try self.allocator.create(Reader);
self.reader.* = Reader{ .file = file, .queue = self }; self.reader.* = Reader{ .file = file, .queue = self };
self.is_tty = file.isTty(); self.is_tty = file.isTty();
@ -41,6 +45,8 @@ pub fn setFile(self: *LogQueue, file: std.fs.File) !void {
pub const Writer = struct { pub const Writer = struct {
file: std.fs.File, file: std.fs.File,
queue: *LogQueue, queue: *LogQueue,
position: usize = 0,
mutex: *std.Thread.Mutex,
/// True if target output file is a TTY. /// True if target output file is a TTY.
pub fn isTty(self: Writer) bool { pub fn isTty(self: Writer) bool {
@ -49,8 +55,22 @@ pub const Writer = struct {
/// Print a log event. /// Print a log event.
pub fn print(self: *Writer, comptime message: []const u8, args: anytype) !void { pub fn print(self: *Writer, comptime message: []const u8, args: anytype) !void {
const output = try std.fmt.allocPrint(self.queue.allocator, message, args); self.mutex.lock();
try self.queue.append(output); defer self.mutex.unlock();
const buf = try self.getBuffer();
self.position += 1;
_ = try std.fmt.bufPrintZ(buf, message, args);
try self.queue.append(buf);
}
fn getBuffer(self: *Writer) !*[buffer_size:0]u8 {
const buffer = if (self.position >= self.queue.buffer_pool.items.len)
try self.queue.allocator.create([buffer_size:0]u8)
else
self.queue.buffer_pool.items[self.position];
return buffer;
} }
}; };
@ -70,8 +90,16 @@ pub const Reader = struct {
self.queue.condition.wait(self.queue.condition_mutex); self.queue.condition.wait(self.queue.condition_mutex);
while (try self.queue.popFirst()) |message| { while (try self.queue.popFirst()) |message| {
defer self.queue.allocator.free(message); self.queue.writer.mutex.lock();
try writer.writeAll(message); defer self.queue.writer.mutex.unlock();
try writer.writeAll(message[0..std.mem.indexOfSentinel(u8, 0, message)]);
self.queue.writer.position -= 1;
if (self.queue.writer.position < self.queue.buffer_pool.items.len) {
self.queue.buffer_pool.items[self.queue.writer.position] = message;
} else {
try self.queue.buffer_pool.append(message);
}
} }
if (!self.file.isTty()) try self.file.sync(); if (!self.file.isTty()) try self.file.sync();
@ -81,15 +109,15 @@ pub const Reader = struct {
// Append a log event to the queue. Signal the publish loop thread to wake up. Recycle nodes if // Append a log event to the queue. Signal the publish loop thread to wake up. Recycle nodes if
// available in the pool, otherwise create a new one. // available in the pool, otherwise create a new one.
fn append(self: *LogQueue, message: []const u8) !void { fn append(self: *LogQueue, message: *[buffer_size:0]u8) !void {
self.read_write_mutex.lock(); self.read_write_mutex.lock();
defer self.read_write_mutex.unlock(); defer self.read_write_mutex.unlock();
const node = if (self.position >= self.pool.items.len) blk: { const node = if (self.position >= self.node_pool.items.len)
break :blk try self.allocator.create(List.Node); try self.allocator.create(List.Node)
} else blk: { else
break :blk self.pool.items[self.position]; self.node_pool.items[self.position];
};
self.position += 1; self.position += 1;
node.* = .{ .data = message }; node.* = .{ .data = message };
@ -99,17 +127,17 @@ fn append(self: *LogQueue, message: []const u8) !void {
} }
// Pop a log event from the queue. Return node to the pool for re-use. // Pop a log event from the queue. Return node to the pool for re-use.
fn popFirst(self: *LogQueue) !?[]const u8 { fn popFirst(self: *LogQueue) !?*[buffer_size:0]u8 {
self.read_write_mutex.lock(); self.read_write_mutex.lock();
defer self.read_write_mutex.unlock(); defer self.read_write_mutex.unlock();
if (self.list.popFirst()) |node| { if (self.list.popFirst()) |node| {
const value = node.data; const value = node.data;
self.position -= 1; self.position -= 1;
if (self.position < self.pool.items.len) { if (self.position < self.node_pool.items.len) {
self.pool.items[self.position] = node; self.node_pool.items[self.position] = node;
} else { } else {
try self.pool.append(node); // TODO: Set a maximum here to avoid never-ending inflation. try self.node_pool.append(node); // TODO: Set a maximum here to avoid never-ending inflation.
} }
return value; return value;
} else { } else {