diff --git a/src/jetzig/App.zig b/src/jetzig/App.zig index a74172e..4e209cd 100644 --- a/src/jetzig/App.zig +++ b/src/jetzig/App.zig @@ -82,7 +82,7 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void { var log_thread = try std.Thread.spawn( .{ .allocator = self.allocator }, jetzig.loggers.LogQueue.Reader.publish, - .{server_options.log_queue.reader}, + .{ &server_options.log_queue.reader, .{} }, ); defer log_thread.join(); diff --git a/src/jetzig/Environment.zig b/src/jetzig/Environment.zig index e122064..ce458a8 100644 --- a/src/jetzig/Environment.zig +++ b/src/jetzig/Environment.zig @@ -64,7 +64,7 @@ pub fn getServerOptions(self: Environment) !jetzig.http.Server.ServerOptions { defer options.deinit(); const log_queue = try self.allocator.create(jetzig.loggers.LogQueue); - log_queue.* = try jetzig.loggers.LogQueue.init(self.allocator); + log_queue.* = jetzig.loggers.LogQueue.init(self.allocator); try log_queue.setFiles( try getLogFile(.stdout, options.options), try getLogFile(.stderr, options.options), diff --git a/src/jetzig/http/Headers.zig b/src/jetzig/http/Headers.zig index 5533ec2..ad54f7b 100644 --- a/src/jetzig/http/Headers.zig +++ b/src/jetzig/http/Headers.zig @@ -78,7 +78,8 @@ pub fn append(self: *Headers, name: []const u8, value: []const u8) !void { test "append (deprecated)" { const allocator = std.testing.allocator; - var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10)); + var httpz_headers = try HttpzKeyValue.init(allocator, 10); + var headers = Headers.init(allocator, &httpz_headers); defer headers.deinit(); try headers.append("foo", "bar"); try std.testing.expectEqualStrings(headers.get("foo").?, "bar"); @@ -86,7 +87,8 @@ test "append (deprecated)" { test "add" { const allocator = std.testing.allocator; - var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10)); + var httpz_headers = try HttpzKeyValue.init(allocator, 10); + var headers = Headers.init(allocator, &httpz_headers); defer headers.deinit(); try headers.append("foo", "bar"); try std.testing.expectEqualStrings(headers.get("foo").?, "bar"); @@ -94,7 +96,8 @@ test "add" { test "get with multiple headers (bugfix regression test)" { const allocator = std.testing.allocator; - var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10)); + var httpz_headers = try HttpzKeyValue.init(allocator, 10); + var headers = Headers.init(allocator, &httpz_headers); defer headers.deinit(); try headers.append("foo", "bar"); try headers.append("bar", "baz"); @@ -103,7 +106,8 @@ test "get with multiple headers (bugfix regression test)" { test "getAll" { const allocator = std.testing.allocator; - var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10)); + var httpz_headers = try HttpzKeyValue.init(allocator, 10); + var headers = Headers.init(allocator, &httpz_headers); defer headers.deinit(); try headers.append("foo", "bar"); try headers.append("foo", "baz"); @@ -115,7 +119,8 @@ test "getAll" { test "add too many headers" { const allocator = std.testing.allocator; - var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10)); + var httpz_headers = try HttpzKeyValue.init(allocator, 10); + var headers = Headers.init(allocator, &httpz_headers); defer headers.deinit(); for (0..10) |_| try headers.append("foo", "bar"); @@ -124,7 +129,8 @@ test "add too many headers" { test "case-insensitive matching" { const allocator = std.testing.allocator; - var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10)); + var httpz_headers = try HttpzKeyValue.init(allocator, 10); + var headers = Headers.init(allocator, &httpz_headers); defer headers.deinit(); try headers.append("Content-Type", "bar"); try std.testing.expectEqualStrings(headers.get("content-type").?, "bar"); diff --git a/src/jetzig/loggers/DevelopmentLogger.zig b/src/jetzig/loggers/DevelopmentLogger.zig index 8bcc85a..28cb0ad 100644 --- a/src/jetzig/loggers/DevelopmentLogger.zig +++ b/src/jetzig/loggers/DevelopmentLogger.zig @@ -51,7 +51,7 @@ pub fn log( const level_formatted = if (colorized) colorizedLogLevel(level) else @tagName(level); const target = jetzig.loggers.logTarget(level); - try self.log_queue.writer.print( + try self.log_queue.print( "{s: >5} [{s}] {s}\n", .{ level_formatted, iso8601, output }, target, @@ -87,7 +87,7 @@ pub fn logRequest(self: DevelopmentLogger, request: *const jetzig.http.Request) var timestamp_buf: [256]u8 = undefined; const iso8601 = try timestamp.iso8601(×tamp_buf); - try self.log_queue.writer.print("{s: >5} [{s}] [{s}/{s}/{s}] {s}\n", .{ + try self.log_queue.print("{s: >5} [{s}] [{s}/{s}/{s}] {s}\n", .{ if (self.stdout_colorized) colorizedLogLevel(.INFO) else @tagName(.INFO), iso8601, formatted_duration, diff --git a/src/jetzig/loggers/JsonLogger.zig b/src/jetzig/loggers/JsonLogger.zig index 135e265..5beb3e3 100644 --- a/src/jetzig/loggers/JsonLogger.zig +++ b/src/jetzig/loggers/JsonLogger.zig @@ -59,7 +59,7 @@ pub fn log( const json = try std.json.stringifyAlloc(self.allocator, log_message, .{ .whitespace = .minified }); defer self.allocator.free(json); - try self.log_queue.writer.print("{s}\n", .{json}, jetzig.loggers.logTarget(level)); + try self.log_queue.print("{s}\n", .{json}, jetzig.loggers.logTarget(level)); } /// Log a one-liner including response status code, path, method, duration, etc. @@ -98,7 +98,7 @@ pub fn logRequest(self: *const JsonLogger, request: *const jetzig.http.Request) } }; - try self.log_queue.writer.print("{s}\n", .{stream.getWritten()}, .stdout); + try self.log_queue.print("{s}\n", .{stream.getWritten()}, .stdout); } fn getFile(self: JsonLogger, level: LogLevel) std.fs.File { diff --git a/src/jetzig/loggers/LogQueue.zig b/src/jetzig/loggers/LogQueue.zig index 4520867..6d9c40e 100644 --- a/src/jetzig/loggers/LogQueue.zig +++ b/src/jetzig/loggers/LogQueue.zig @@ -2,18 +2,22 @@ const std = @import("std"); const jetzig = @import("../../jetzig.zig"); -const List = std.DoublyLinkedList(Event); const buffer_size = jetzig.config.get(usize, "log_message_buffer_len"); +const List = std.DoublyLinkedList(Event); +const Buffer = [buffer_size]u8; + allocator: std.mem.Allocator, +node_allocator: std.heap.MemoryPool(List.Node), +buffer_allocator: std.heap.MemoryPool(Buffer), list: List, -read_write_mutex: *std.Thread.Mutex, -condition: *std.Thread.Condition, -condition_mutex: *std.Thread.Mutex, -writer: *Writer = undefined, -reader: *Reader = undefined, +read_write_mutex: std.Thread.Mutex, +condition: std.Thread.Condition, +condition_mutex: std.Thread.Mutex, +writer: Writer = undefined, +reader: Reader = undefined, node_pool: std.ArrayList(*List.Node), -buffer_pool: std.ArrayList(*[buffer_size:0]u8), +buffer_pool: std.ArrayList(*Buffer), position: usize, stdout_is_tty: bool = undefined, stderr_is_tty: bool = undefined, @@ -24,34 +28,51 @@ const LogQueue = @This(); pub const Target = enum { stdout, stderr }; const Event = struct { - message: *[buffer_size:0]u8, + message: *Buffer, len: usize, target: Target, ptr: ?[]const u8, }; -pub fn init(allocator: std.mem.Allocator) !LogQueue { +/// Create a new `LogQueue`. +pub fn init(allocator: std.mem.Allocator) LogQueue { return .{ .allocator = allocator, + .node_allocator = std.heap.MemoryPool(List.Node).init(allocator), + .buffer_allocator = std.heap.MemoryPool(Buffer).init(allocator), .list = List{}, - .condition = try allocator.create(std.Thread.Condition), - .condition_mutex = try allocator.create(std.Thread.Mutex), - .read_write_mutex = try allocator.create(std.Thread.Mutex), + .condition = std.Thread.Condition{}, + .condition_mutex = std.Thread.Mutex{}, + .read_write_mutex = std.Thread.Mutex{}, .node_pool = std.ArrayList(*List.Node).init(allocator), - .buffer_pool = std.ArrayList(*[buffer_size:0]u8).init(allocator), + .buffer_pool = std.ArrayList(*Buffer).init(allocator), .position = 0, }; } -/// Set the stdout and stderr outputs. +/// Free allocated resources and return to `pending` state. +pub fn deinit(self: *LogQueue) void { + self.node_pool.deinit(); + self.buffer_pool.deinit(); + + while (self.list.popFirst()) |node| { + if (node.data.ptr) |ptr| self.allocator.free(ptr); + self.allocator.destroy(node.data.message); + } + + self.buffer_allocator.deinit(); + self.node_allocator.deinit(); + + self.state = .pending; +} + +/// Set the stdout and stderr outputs. Must be called before `print`. pub fn setFiles(self: *LogQueue, stdout_file: std.fs.File, stderr_file: std.fs.File) !void { - self.writer = try self.allocator.create(Writer); - self.writer.* = Writer{ + self.writer = Writer{ .queue = self, - .mutex = try self.allocator.create(std.Thread.Mutex), + .mutex = std.Thread.Mutex{}, }; - self.reader = try self.allocator.create(Reader); - self.reader.* = Reader{ + self.reader = Reader{ .stdout_file = stdout_file, .stderr_file = stderr_file, .queue = self, @@ -61,11 +82,17 @@ pub fn setFiles(self: *LogQueue, stdout_file: std.fs.File, stderr_file: std.fs.F self.state = .ready; } +pub fn print(self: *LogQueue, comptime message: []const u8, args: anytype, target: Target) !void { + std.debug.assert(self.state == .ready); + + try self.writer.print(message, args, target); +} + /// Writer for `LogQueue`. Receives log events and publishes to the queue. pub const Writer = struct { queue: *LogQueue, position: usize = 0, - mutex: *std.Thread.Mutex, + mutex: std.Thread.Mutex, /// Print a log event. Messages longer than `jetzig.config.get(usize, "log_message_buffer_len")` /// spill to heap with degraded performance. Adjust buffer length or limit long entries to @@ -77,8 +104,6 @@ pub const Writer = struct { args: anytype, target: Target, ) !void { - std.debug.assert(self.queue.state == .ready); - self.mutex.lock(); defer self.mutex.unlock(); @@ -86,7 +111,7 @@ pub const Writer = struct { self.position += 1; var ptr: ?[]const u8 = null; - const result = std.fmt.bufPrintZ(buf, message, args) catch |err| switch (err) { + const result = std.fmt.bufPrint(buf, message, args) catch |err| switch (err) { error.NoSpaceLeft => blk: { ptr = try std.fmt.allocPrint(self.queue.allocator, message, args); self.position -= 1; @@ -102,9 +127,9 @@ pub const Writer = struct { }); } - fn getBuffer(self: *Writer) !*[buffer_size:0]u8 { + fn getBuffer(self: *Writer) !*Buffer { const buffer = if (self.position >= self.queue.buffer_pool.items.len) - try self.queue.allocator.create([buffer_size:0]u8) + try self.queue.buffer_allocator.create() else self.queue.buffer_pool.items[self.position]; @@ -112,7 +137,8 @@ pub const Writer = struct { } }; -/// Reader for `LogQueue`. Reads log events from the queue. +/// Reader for `LogQueue`. Reads log events from the queue and writes them to the designated +/// target (stdout or stderr). pub const Reader = struct { stdout_file: std.fs.File, stderr_file: std.fs.File, @@ -120,7 +146,7 @@ pub const Reader = struct { /// Publish log events from the queue. Invoke from a dedicated thread. Sleeps when log queue /// is empty, wakes up when a new event is published. - pub fn publish(self: *Reader) !void { + pub fn publish(self: *Reader, options: struct { oneshot: bool = false }) !void { std.debug.assert(self.queue.state == .ready); const stdout_writer = self.stdout_file.writer(); @@ -130,7 +156,7 @@ pub const Reader = struct { self.queue.condition_mutex.lock(); defer self.queue.condition_mutex.unlock(); - self.queue.condition.wait(self.queue.condition_mutex); + if (!options.oneshot) self.queue.condition.wait(&self.queue.condition_mutex); var stdout_written = false; var stderr_written = false; @@ -170,6 +196,8 @@ pub const Reader = struct { if (stdout_written and !self.queue.stdout_is_tty) try self.stdout_file.sync(); if (stderr_written and !self.queue.stderr_is_tty) try self.stderr_file.sync(); + + if (options.oneshot) break; } } }; @@ -181,7 +209,7 @@ fn append(self: *LogQueue, event: Event) !void { defer self.read_write_mutex.unlock(); const node = if (self.position >= self.node_pool.items.len) - try self.allocator.create(List.Node) + try self.node_allocator.create() else self.node_pool.items[self.position]; @@ -212,17 +240,71 @@ fn popFirst(self: *LogQueue) !?Event { } } -test "setFiles" { - var log_queue = try LogQueue.init(std.testing.allocator); +test "print to stdout and stderr" { + var log_queue = LogQueue.init(std.testing.allocator); + defer log_queue.deinit(); var tmp_dir = std.testing.tmpDir(.{}); defer tmp_dir.cleanup(); - const stdout = tmp_dir.dir.createFile("stdout.log", .{}); + const stdout = try tmp_dir.dir.createFile("stdout.log", .{ .read = true }); defer stdout.close(); - const stderr = tmp_dir.dir.createFile("stderr.log", .{}); + const stderr = try tmp_dir.dir.createFile("stderr.log", .{ .read = true }); defer stderr.close(); try log_queue.setFiles(stdout, stderr); + try log_queue.print("foo {s}\n", .{"bar"}, .stdout); + try log_queue.print("baz {s}\n", .{"qux"}, .stderr); + try log_queue.print("quux {s}\n", .{"corge"}, .stdout); + try log_queue.print("grault {s}\n", .{"garply"}, .stderr); + try log_queue.print("waldo {s}\n", .{"fred"}, .stderr); + try log_queue.print("plugh {s}\n", .{"zyzzy"}, .stdout); + + try log_queue.reader.publish(.{ .oneshot = true }); + + try stdout.seekTo(0); + var buf: [1024]u8 = undefined; + var len = try stdout.readAll(&buf); + + try std.testing.expectEqualStrings( + \\foo bar + \\quux corge + \\plugh zyzzy + \\ + , buf[0..len]); + + try stderr.seekTo(0); + len = try stderr.readAll(&buf); + try std.testing.expectEqualStrings( + \\baz qux + \\grault garply + \\waldo fred + \\ + , buf[0..len]); +} + +test "long messages" { + var log_queue = LogQueue.init(std.testing.allocator); + defer log_queue.deinit(); + + var tmp_dir = std.testing.tmpDir(.{}); + defer tmp_dir.cleanup(); + + const stdout = try tmp_dir.dir.createFile("stdout.log", .{ .read = true }); + defer stdout.close(); + + const stderr = try tmp_dir.dir.createFile("stderr.log", .{ .read = true }); + defer stderr.close(); + + try log_queue.setFiles(stdout, stderr); + try log_queue.print("foo" ** buffer_size, .{}, .stdout); + + try log_queue.reader.publish(.{ .oneshot = true }); + + try stdout.seekTo(0); + var buf: [buffer_size * 3]u8 = undefined; + const len = try stdout.readAll(&buf); + + try std.testing.expectEqualStrings("foo" ** buffer_size, buf[0..len]); }