From 997ac4ec508fd5c4455faa3575b3c89d9f65f43b Mon Sep 17 00:00:00 2001 From: Bob Farrell Date: Fri, 10 May 2024 19:42:26 +0100 Subject: [PATCH] WIP --- src/jetzig/http/Request.zig | 2 +- src/jetzig/loggers/LogQueue.zig | 66 +++++++++++++++++++++++---------- 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/src/jetzig/http/Request.zig b/src/jetzig/http/Request.zig index abd6284..a0e1902 100644 --- a/src/jetzig/http/Request.zig +++ b/src/jetzig/http/Request.zig @@ -134,7 +134,7 @@ pub fn deinit(self: *Request) void { if (self.processed) self.allocator.free(self.body); } -/// Process request, read body if present, parse headers (TODO) +/// Process request, read body if present, parse headers pub fn process(self: *Request) !void { var cookie: ?[]const u8 = null; diff --git a/src/jetzig/loggers/LogQueue.zig b/src/jetzig/loggers/LogQueue.zig index f725582..7acd4e7 100644 --- a/src/jetzig/loggers/LogQueue.zig +++ b/src/jetzig/loggers/LogQueue.zig @@ -1,6 +1,8 @@ const std = @import("std"); -const List = std.DoublyLinkedList([]const u8); +const List = std.DoublyLinkedList(*[buffer_size:0]u8); +const buffer_size = 4096; + allocator: std.mem.Allocator, list: *List, read_write_mutex: *std.Thread.Mutex, @@ -8,14 +10,15 @@ condition: *std.Thread.Condition, condition_mutex: *std.Thread.Mutex, writer: *Writer = undefined, reader: *Reader = undefined, -pool: std.ArrayList(*List.Node), +node_pool: std.ArrayList(*List.Node), +buffer_pool: std.ArrayList(*[buffer_size:0]u8), position: usize, is_tty: bool = undefined, const LogQueue = @This(); pub fn init(allocator: std.mem.Allocator) !LogQueue { - const list = try allocator.create(std.DoublyLinkedList([]const u8)); + const list = try allocator.create(std.DoublyLinkedList(*[buffer_size:0]u8)); list.* = .{}; return .{ @@ -24,14 +27,15 @@ pub fn init(allocator: std.mem.Allocator) !LogQueue { .condition = try allocator.create(std.Thread.Condition), .condition_mutex = try allocator.create(std.Thread.Mutex), .read_write_mutex = try allocator.create(std.Thread.Mutex), - .pool = std.ArrayList(*List.Node).init(allocator), + .node_pool = std.ArrayList(*List.Node).init(allocator), + .buffer_pool = std.ArrayList(*[buffer_size:0]u8).init(allocator), .position = 0, }; } pub fn setFile(self: *LogQueue, file: std.fs.File) !void { self.writer = try self.allocator.create(Writer); - self.writer.* = Writer{ .file = file, .queue = self }; + self.writer.* = Writer{ .file = file, .queue = self, .mutex = try self.allocator.create(std.Thread.Mutex) }; self.reader = try self.allocator.create(Reader); self.reader.* = Reader{ .file = file, .queue = self }; self.is_tty = file.isTty(); @@ -41,6 +45,8 @@ pub fn setFile(self: *LogQueue, file: std.fs.File) !void { pub const Writer = struct { file: std.fs.File, queue: *LogQueue, + position: usize = 0, + mutex: *std.Thread.Mutex, /// True if target output file is a TTY. pub fn isTty(self: Writer) bool { @@ -49,8 +55,22 @@ pub const Writer = struct { /// Print a log event. pub fn print(self: *Writer, comptime message: []const u8, args: anytype) !void { - const output = try std.fmt.allocPrint(self.queue.allocator, message, args); - try self.queue.append(output); + self.mutex.lock(); + defer self.mutex.unlock(); + + const buf = try self.getBuffer(); + self.position += 1; + _ = try std.fmt.bufPrintZ(buf, message, args); + try self.queue.append(buf); + } + + fn getBuffer(self: *Writer) !*[buffer_size:0]u8 { + const buffer = if (self.position >= self.queue.buffer_pool.items.len) + try self.queue.allocator.create([buffer_size:0]u8) + else + self.queue.buffer_pool.items[self.position]; + + return buffer; } }; @@ -70,8 +90,16 @@ pub const Reader = struct { self.queue.condition.wait(self.queue.condition_mutex); while (try self.queue.popFirst()) |message| { - defer self.queue.allocator.free(message); - try writer.writeAll(message); + self.queue.writer.mutex.lock(); + defer self.queue.writer.mutex.unlock(); + + try writer.writeAll(message[0..std.mem.indexOfSentinel(u8, 0, message)]); + self.queue.writer.position -= 1; + if (self.queue.writer.position < self.queue.buffer_pool.items.len) { + self.queue.buffer_pool.items[self.queue.writer.position] = message; + } else { + try self.queue.buffer_pool.append(message); + } } if (!self.file.isTty()) try self.file.sync(); @@ -81,15 +109,15 @@ pub const Reader = struct { // Append a log event to the queue. Signal the publish loop thread to wake up. Recycle nodes if // available in the pool, otherwise create a new one. -fn append(self: *LogQueue, message: []const u8) !void { +fn append(self: *LogQueue, message: *[buffer_size:0]u8) !void { self.read_write_mutex.lock(); defer self.read_write_mutex.unlock(); - const node = if (self.position >= self.pool.items.len) blk: { - break :blk try self.allocator.create(List.Node); - } else blk: { - break :blk self.pool.items[self.position]; - }; + const node = if (self.position >= self.node_pool.items.len) + try self.allocator.create(List.Node) + else + self.node_pool.items[self.position]; + self.position += 1; node.* = .{ .data = message }; @@ -99,17 +127,17 @@ fn append(self: *LogQueue, message: []const u8) !void { } // Pop a log event from the queue. Return node to the pool for re-use. -fn popFirst(self: *LogQueue) !?[]const u8 { +fn popFirst(self: *LogQueue) !?*[buffer_size:0]u8 { self.read_write_mutex.lock(); defer self.read_write_mutex.unlock(); if (self.list.popFirst()) |node| { const value = node.data; self.position -= 1; - if (self.position < self.pool.items.len) { - self.pool.items[self.position] = node; + if (self.position < self.node_pool.items.len) { + self.node_pool.items[self.position] = node; } else { - try self.pool.append(node); // TODO: Set a maximum here to avoid never-ending inflation. + try self.node_pool.append(node); // TODO: Set a maximum here to avoid never-ending inflation. } return value; } else {