This commit is contained in:
Bob Farrell 2024-05-11 20:52:56 +01:00
parent 055da0f85a
commit 91e0b44404
6 changed files with 133 additions and 45 deletions

View File

@ -82,7 +82,7 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void {
var log_thread = try std.Thread.spawn(
.{ .allocator = self.allocator },
jetzig.loggers.LogQueue.Reader.publish,
.{server_options.log_queue.reader},
.{ &server_options.log_queue.reader, .{} },
);
defer log_thread.join();

View File

@ -64,7 +64,7 @@ pub fn getServerOptions(self: Environment) !jetzig.http.Server.ServerOptions {
defer options.deinit();
const log_queue = try self.allocator.create(jetzig.loggers.LogQueue);
log_queue.* = try jetzig.loggers.LogQueue.init(self.allocator);
log_queue.* = jetzig.loggers.LogQueue.init(self.allocator);
try log_queue.setFiles(
try getLogFile(.stdout, options.options),
try getLogFile(.stderr, options.options),

View File

@ -78,7 +78,8 @@ pub fn append(self: *Headers, name: []const u8, value: []const u8) !void {
test "append (deprecated)" {
const allocator = std.testing.allocator;
var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10));
var httpz_headers = try HttpzKeyValue.init(allocator, 10);
var headers = Headers.init(allocator, &httpz_headers);
defer headers.deinit();
try headers.append("foo", "bar");
try std.testing.expectEqualStrings(headers.get("foo").?, "bar");
@ -86,7 +87,8 @@ test "append (deprecated)" {
test "add" {
const allocator = std.testing.allocator;
var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10));
var httpz_headers = try HttpzKeyValue.init(allocator, 10);
var headers = Headers.init(allocator, &httpz_headers);
defer headers.deinit();
try headers.append("foo", "bar");
try std.testing.expectEqualStrings(headers.get("foo").?, "bar");
@ -94,7 +96,8 @@ test "add" {
test "get with multiple headers (bugfix regression test)" {
const allocator = std.testing.allocator;
var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10));
var httpz_headers = try HttpzKeyValue.init(allocator, 10);
var headers = Headers.init(allocator, &httpz_headers);
defer headers.deinit();
try headers.append("foo", "bar");
try headers.append("bar", "baz");
@ -103,7 +106,8 @@ test "get with multiple headers (bugfix regression test)" {
test "getAll" {
const allocator = std.testing.allocator;
var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10));
var httpz_headers = try HttpzKeyValue.init(allocator, 10);
var headers = Headers.init(allocator, &httpz_headers);
defer headers.deinit();
try headers.append("foo", "bar");
try headers.append("foo", "baz");
@ -115,7 +119,8 @@ test "getAll" {
test "add too many headers" {
const allocator = std.testing.allocator;
var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10));
var httpz_headers = try HttpzKeyValue.init(allocator, 10);
var headers = Headers.init(allocator, &httpz_headers);
defer headers.deinit();
for (0..10) |_| try headers.append("foo", "bar");
@ -124,7 +129,8 @@ test "add too many headers" {
test "case-insensitive matching" {
const allocator = std.testing.allocator;
var headers = Headers.init(allocator, try HttpzKeyValue.init(allocator, 10));
var httpz_headers = try HttpzKeyValue.init(allocator, 10);
var headers = Headers.init(allocator, &httpz_headers);
defer headers.deinit();
try headers.append("Content-Type", "bar");
try std.testing.expectEqualStrings(headers.get("content-type").?, "bar");

View File

@ -51,7 +51,7 @@ pub fn log(
const level_formatted = if (colorized) colorizedLogLevel(level) else @tagName(level);
const target = jetzig.loggers.logTarget(level);
try self.log_queue.writer.print(
try self.log_queue.print(
"{s: >5} [{s}] {s}\n",
.{ level_formatted, iso8601, output },
target,
@ -87,7 +87,7 @@ pub fn logRequest(self: DevelopmentLogger, request: *const jetzig.http.Request)
var timestamp_buf: [256]u8 = undefined;
const iso8601 = try timestamp.iso8601(&timestamp_buf);
try self.log_queue.writer.print("{s: >5} [{s}] [{s}/{s}/{s}] {s}\n", .{
try self.log_queue.print("{s: >5} [{s}] [{s}/{s}/{s}] {s}\n", .{
if (self.stdout_colorized) colorizedLogLevel(.INFO) else @tagName(.INFO),
iso8601,
formatted_duration,

View File

@ -59,7 +59,7 @@ pub fn log(
const json = try std.json.stringifyAlloc(self.allocator, log_message, .{ .whitespace = .minified });
defer self.allocator.free(json);
try self.log_queue.writer.print("{s}\n", .{json}, jetzig.loggers.logTarget(level));
try self.log_queue.print("{s}\n", .{json}, jetzig.loggers.logTarget(level));
}
/// Log a one-liner including response status code, path, method, duration, etc.
@ -98,7 +98,7 @@ pub fn logRequest(self: *const JsonLogger, request: *const jetzig.http.Request)
}
};
try self.log_queue.writer.print("{s}\n", .{stream.getWritten()}, .stdout);
try self.log_queue.print("{s}\n", .{stream.getWritten()}, .stdout);
}
fn getFile(self: JsonLogger, level: LogLevel) std.fs.File {

View File

@ -2,18 +2,22 @@ const std = @import("std");
const jetzig = @import("../../jetzig.zig");
const List = std.DoublyLinkedList(Event);
const buffer_size = jetzig.config.get(usize, "log_message_buffer_len");
const List = std.DoublyLinkedList(Event);
const Buffer = [buffer_size]u8;
allocator: std.mem.Allocator,
node_allocator: std.heap.MemoryPool(List.Node),
buffer_allocator: std.heap.MemoryPool(Buffer),
list: List,
read_write_mutex: *std.Thread.Mutex,
condition: *std.Thread.Condition,
condition_mutex: *std.Thread.Mutex,
writer: *Writer = undefined,
reader: *Reader = undefined,
read_write_mutex: std.Thread.Mutex,
condition: std.Thread.Condition,
condition_mutex: std.Thread.Mutex,
writer: Writer = undefined,
reader: Reader = undefined,
node_pool: std.ArrayList(*List.Node),
buffer_pool: std.ArrayList(*[buffer_size:0]u8),
buffer_pool: std.ArrayList(*Buffer),
position: usize,
stdout_is_tty: bool = undefined,
stderr_is_tty: bool = undefined,
@ -24,34 +28,51 @@ const LogQueue = @This();
pub const Target = enum { stdout, stderr };
const Event = struct {
message: *[buffer_size:0]u8,
message: *Buffer,
len: usize,
target: Target,
ptr: ?[]const u8,
};
pub fn init(allocator: std.mem.Allocator) !LogQueue {
/// Create a new `LogQueue`.
pub fn init(allocator: std.mem.Allocator) LogQueue {
return .{
.allocator = allocator,
.node_allocator = std.heap.MemoryPool(List.Node).init(allocator),
.buffer_allocator = std.heap.MemoryPool(Buffer).init(allocator),
.list = List{},
.condition = try allocator.create(std.Thread.Condition),
.condition_mutex = try allocator.create(std.Thread.Mutex),
.read_write_mutex = try allocator.create(std.Thread.Mutex),
.condition = std.Thread.Condition{},
.condition_mutex = std.Thread.Mutex{},
.read_write_mutex = std.Thread.Mutex{},
.node_pool = std.ArrayList(*List.Node).init(allocator),
.buffer_pool = std.ArrayList(*[buffer_size:0]u8).init(allocator),
.buffer_pool = std.ArrayList(*Buffer).init(allocator),
.position = 0,
};
}
/// Set the stdout and stderr outputs.
/// Free allocated resources and return to `pending` state.
pub fn deinit(self: *LogQueue) void {
self.node_pool.deinit();
self.buffer_pool.deinit();
while (self.list.popFirst()) |node| {
if (node.data.ptr) |ptr| self.allocator.free(ptr);
self.allocator.destroy(node.data.message);
}
self.buffer_allocator.deinit();
self.node_allocator.deinit();
self.state = .pending;
}
/// Set the stdout and stderr outputs. Must be called before `print`.
pub fn setFiles(self: *LogQueue, stdout_file: std.fs.File, stderr_file: std.fs.File) !void {
self.writer = try self.allocator.create(Writer);
self.writer.* = Writer{
self.writer = Writer{
.queue = self,
.mutex = try self.allocator.create(std.Thread.Mutex),
.mutex = std.Thread.Mutex{},
};
self.reader = try self.allocator.create(Reader);
self.reader.* = Reader{
self.reader = Reader{
.stdout_file = stdout_file,
.stderr_file = stderr_file,
.queue = self,
@ -61,11 +82,17 @@ pub fn setFiles(self: *LogQueue, stdout_file: std.fs.File, stderr_file: std.fs.F
self.state = .ready;
}
pub fn print(self: *LogQueue, comptime message: []const u8, args: anytype, target: Target) !void {
std.debug.assert(self.state == .ready);
try self.writer.print(message, args, target);
}
/// Writer for `LogQueue`. Receives log events and publishes to the queue.
pub const Writer = struct {
queue: *LogQueue,
position: usize = 0,
mutex: *std.Thread.Mutex,
mutex: std.Thread.Mutex,
/// Print a log event. Messages longer than `jetzig.config.get(usize, "log_message_buffer_len")`
/// spill to heap with degraded performance. Adjust buffer length or limit long entries to
@ -77,8 +104,6 @@ pub const Writer = struct {
args: anytype,
target: Target,
) !void {
std.debug.assert(self.queue.state == .ready);
self.mutex.lock();
defer self.mutex.unlock();
@ -86,7 +111,7 @@ pub const Writer = struct {
self.position += 1;
var ptr: ?[]const u8 = null;
const result = std.fmt.bufPrintZ(buf, message, args) catch |err| switch (err) {
const result = std.fmt.bufPrint(buf, message, args) catch |err| switch (err) {
error.NoSpaceLeft => blk: {
ptr = try std.fmt.allocPrint(self.queue.allocator, message, args);
self.position -= 1;
@ -102,9 +127,9 @@ pub const Writer = struct {
});
}
fn getBuffer(self: *Writer) !*[buffer_size:0]u8 {
fn getBuffer(self: *Writer) !*Buffer {
const buffer = if (self.position >= self.queue.buffer_pool.items.len)
try self.queue.allocator.create([buffer_size:0]u8)
try self.queue.buffer_allocator.create()
else
self.queue.buffer_pool.items[self.position];
@ -112,7 +137,8 @@ pub const Writer = struct {
}
};
/// Reader for `LogQueue`. Reads log events from the queue.
/// Reader for `LogQueue`. Reads log events from the queue and writes them to the designated
/// target (stdout or stderr).
pub const Reader = struct {
stdout_file: std.fs.File,
stderr_file: std.fs.File,
@ -120,7 +146,7 @@ pub const Reader = struct {
/// Publish log events from the queue. Invoke from a dedicated thread. Sleeps when log queue
/// is empty, wakes up when a new event is published.
pub fn publish(self: *Reader) !void {
pub fn publish(self: *Reader, options: struct { oneshot: bool = false }) !void {
std.debug.assert(self.queue.state == .ready);
const stdout_writer = self.stdout_file.writer();
@ -130,7 +156,7 @@ pub const Reader = struct {
self.queue.condition_mutex.lock();
defer self.queue.condition_mutex.unlock();
self.queue.condition.wait(self.queue.condition_mutex);
if (!options.oneshot) self.queue.condition.wait(&self.queue.condition_mutex);
var stdout_written = false;
var stderr_written = false;
@ -170,6 +196,8 @@ pub const Reader = struct {
if (stdout_written and !self.queue.stdout_is_tty) try self.stdout_file.sync();
if (stderr_written and !self.queue.stderr_is_tty) try self.stderr_file.sync();
if (options.oneshot) break;
}
}
};
@ -181,7 +209,7 @@ fn append(self: *LogQueue, event: Event) !void {
defer self.read_write_mutex.unlock();
const node = if (self.position >= self.node_pool.items.len)
try self.allocator.create(List.Node)
try self.node_allocator.create()
else
self.node_pool.items[self.position];
@ -212,17 +240,71 @@ fn popFirst(self: *LogQueue) !?Event {
}
}
test "setFiles" {
var log_queue = try LogQueue.init(std.testing.allocator);
test "print to stdout and stderr" {
var log_queue = LogQueue.init(std.testing.allocator);
defer log_queue.deinit();
var tmp_dir = std.testing.tmpDir(.{});
defer tmp_dir.cleanup();
const stdout = tmp_dir.dir.createFile("stdout.log", .{});
const stdout = try tmp_dir.dir.createFile("stdout.log", .{ .read = true });
defer stdout.close();
const stderr = tmp_dir.dir.createFile("stderr.log", .{});
const stderr = try tmp_dir.dir.createFile("stderr.log", .{ .read = true });
defer stderr.close();
try log_queue.setFiles(stdout, stderr);
try log_queue.print("foo {s}\n", .{"bar"}, .stdout);
try log_queue.print("baz {s}\n", .{"qux"}, .stderr);
try log_queue.print("quux {s}\n", .{"corge"}, .stdout);
try log_queue.print("grault {s}\n", .{"garply"}, .stderr);
try log_queue.print("waldo {s}\n", .{"fred"}, .stderr);
try log_queue.print("plugh {s}\n", .{"zyzzy"}, .stdout);
try log_queue.reader.publish(.{ .oneshot = true });
try stdout.seekTo(0);
var buf: [1024]u8 = undefined;
var len = try stdout.readAll(&buf);
try std.testing.expectEqualStrings(
\\foo bar
\\quux corge
\\plugh zyzzy
\\
, buf[0..len]);
try stderr.seekTo(0);
len = try stderr.readAll(&buf);
try std.testing.expectEqualStrings(
\\baz qux
\\grault garply
\\waldo fred
\\
, buf[0..len]);
}
test "long messages" {
var log_queue = LogQueue.init(std.testing.allocator);
defer log_queue.deinit();
var tmp_dir = std.testing.tmpDir(.{});
defer tmp_dir.cleanup();
const stdout = try tmp_dir.dir.createFile("stdout.log", .{ .read = true });
defer stdout.close();
const stderr = try tmp_dir.dir.createFile("stderr.log", .{ .read = true });
defer stderr.close();
try log_queue.setFiles(stdout, stderr);
try log_queue.print("foo" ** buffer_size, .{}, .stdout);
try log_queue.reader.publish(.{ .oneshot = true });
try stdout.seekTo(0);
var buf: [buffer_size * 3]u8 = undefined;
const len = try stdout.readAll(&buf);
try std.testing.expectEqualStrings("foo" ** buffer_size, buf[0..len]);
}