From b7d750c54fcd1431a255627f8cfffd5a9da2a28f Mon Sep 17 00:00:00 2001 From: Bob Farrell Date: Wed, 1 May 2024 18:01:22 +0100 Subject: [PATCH] Formalise JetKV interface for store, cache, jobs Provide configuration for JetKV backend (memory or file allocator). --- build.zig.zon | 12 ++-- demo/src/app/views/cache.zig | 24 +++++++ demo/src/app/views/cache/index.zmpl | 3 + demo/src/app/views/cache/post.zmpl | 3 + demo/src/app/views/kvstore.zig | 30 ++++----- demo/src/app/views/kvstore/index.zmpl | 4 ++ demo/src/main.zig | 39 +++++++++++ src/jetzig.zig | 42 +++++++++++- src/jetzig/App.zig | 42 +++++++++--- src/jetzig/http/Query.zig | 4 +- src/jetzig/http/Request.zig | 97 ++++++++++++++++++--------- src/jetzig/http/Server.zig | 12 +++- src/jetzig/jobs/Job.zig | 18 +++-- src/jetzig/jobs/Pool.zig | 8 +-- src/jetzig/jobs/Worker.zig | 71 ++++++++------------ src/jetzig/kv.zig | 3 + src/jetzig/kv/Store.zig | 94 ++++++++++++++++++++++++++ src/jetzig/views/Route.zig | 2 +- 18 files changed, 383 insertions(+), 125 deletions(-) create mode 100644 demo/src/app/views/cache.zig create mode 100644 demo/src/app/views/cache/index.zmpl create mode 100644 demo/src/app/views/cache/post.zmpl create mode 100644 demo/src/app/views/kvstore/index.zmpl create mode 100644 src/jetzig/kv.zig create mode 100644 src/jetzig/kv/Store.zig diff --git a/build.zig.zon b/build.zig.zon index 2754e5e..e809431 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -7,17 +7,17 @@ .hash = "1220482f07f2bbaef335f20d6890c15a1e14739950b784232bc69182423520e058a5", }, .zmpl = .{ - .url = "https://github.com/jetzig-framework/zmpl/archive/046c05d376a4fe89d86c52596baa18137891cd87.tar.gz", - .hash = "1220d8890b1161e4356d2c59d4b88280566d55480dae840b6f5dae34bf852bef6b56", + .url = "https://github.com/jetzig-framework/zmpl/archive/4457eba50bd2eff3743601aa00adbffebcd4207a.tar.gz", + .hash = "122001d661e534ef59fc20936330bb0c3068c8aaf1b7c60f9dde9e58d9a536918754", + }, + .jetkv = .{ + .url = "https://github.com/jetzig-framework/jetkv/archive/edfda9108c857dd5a04f87ae48667d3ed57612d9.tar.gz", + .hash = "122015e8a911a7e3a519bbf09c5ef932b569428f6eb6946417df5747a2e76b2a09b0", }, .args = .{ .url = "https://github.com/MasterQ32/zig-args/archive/01d72b9a0128c474aeeb9019edd48605fa6d95f7.tar.gz", .hash = "12208a1de366740d11de525db7289345949f5fd46527db3f89eecc7bb49b012c0732", }, - .jetkv = .{ - .url = "https://github.com/jetzig-framework/jetkv/archive/a6fcc2df220c1a40094e167eeb567bb5888404e9.tar.gz", - .hash = "12207bd2d7465b33e745a5b0567172377f94a221d1fc9aab238bb1b372c64f4ec1a0", - }, .smtp_client = .{ .url = "https://github.com/karlseguin/smtp_client.zig/archive/e79e411862d4f4d41657bf41efb884efca3d67dd.tar.gz", .hash = "12209907c69891a38e6923308930ac43bfb40135bc609ea370b5759fc2e1c4f57284", diff --git a/demo/src/app/views/cache.zig b/demo/src/app/views/cache.zig new file mode 100644 index 0000000..5041502 --- /dev/null +++ b/demo/src/app/views/cache.zig @@ -0,0 +1,24 @@ +const std = @import("std"); +const jetzig = @import("jetzig"); + +pub fn index(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View { + var root = try data.object(); + try root.put("cached_value", try request.cache.get("example")); + + return request.render(.ok); +} + +pub fn post(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View { + var root = try data.object(); + + const params = try request.params(); + + if (params.get("message")) |message| { + try request.cache.put("message", message); + try root.put("message", message); + } else { + try root.put("message", data.string("[no message param detected]")); + } + + return request.render(.ok); +} diff --git a/demo/src/app/views/cache/index.zmpl b/demo/src/app/views/cache/index.zmpl new file mode 100644 index 0000000..a74c6ac --- /dev/null +++ b/demo/src/app/views/cache/index.zmpl @@ -0,0 +1,3 @@ +
+ Cached value: {{.cached_value}} +
diff --git a/demo/src/app/views/cache/post.zmpl b/demo/src/app/views/cache/post.zmpl new file mode 100644 index 0000000..76efa81 --- /dev/null +++ b/demo/src/app/views/cache/post.zmpl @@ -0,0 +1,3 @@ +
+ Value "{{.message}}" added to cache +
diff --git a/demo/src/app/views/kvstore.zig b/demo/src/app/views/kvstore.zig index 33c1a85..b86a024 100644 --- a/demo/src/app/views/kvstore.zig +++ b/demo/src/app/views/kvstore.zig @@ -6,29 +6,23 @@ pub fn index(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View { // Fetch a string from the KV store. If it exists, store it in the root data object, // otherwise store a string value to be picked up by the next request. - if (request.kvGet(.string, "example-key")) |capture| { - try root.put("stored_string", data.string(capture)); + if (try request.store.fetchRemove("example-key")) |capture| { + try root.put("stored_string", capture); } else { - try request.kvPut(.string, "example-key", "example-value"); + try root.put("stored_string", null); + try request.store.put("example-key", data.string("example-value")); } - // Pop an item from the array and store it in the root data object. This will empty the + // Left-pop an item from the array and store it in the root data object. This will empty the // array after multiple requests. - if (request.kvPop("example-array")) |string| try root.put("popped", data.string(string)); - - // Fetch an array from the KV store. If it exists, store its values in the root data object, - // otherwise store a new array to be picked up by the next request. - if (request.kvGet(.array, "example-array")) |kv_array| { - var array = try data.array(); - for (kv_array.items()) |item| try array.append(data.string(item)); - try root.put("stored_array", array); + if (try request.store.popFirst("example-array")) |value| { + try root.put("popped", value); } else { - // Create a KV Array and store it in the key value store. - var kv_array = request.kvArray(); - try kv_array.append("hello"); - try kv_array.append("goodbye"); - try kv_array.append("hello again"); - try request.kvPut(.array, "example-array", kv_array); + try root.put("popped", null); + // Store some values in an array in the KV store. + try request.store.append("example-array", data.string("hello")); + try request.store.append("example-array", data.string("goodbye")); + try request.store.append("example-array", data.string("hello again")); } return request.render(.ok); diff --git a/demo/src/app/views/kvstore/index.zmpl b/demo/src/app/views/kvstore/index.zmpl new file mode 100644 index 0000000..8234d29 --- /dev/null +++ b/demo/src/app/views/kvstore/index.zmpl @@ -0,0 +1,4 @@ +
Stored string: {{.stored_string}}
+
Left-popped array value: {{.popped}}
+ +
Refresh this page to cycle through values
diff --git a/demo/src/main.zig b/demo/src/main.zig index 702b519..08213d1 100644 --- a/demo/src/main.zig +++ b/demo/src/main.zig @@ -41,6 +41,45 @@ pub const jetzig_options = struct { // milliseconds. // pub const job_worker_sleep_interval_ms: usize = 10; + /// Key-value store options. Set backend to `.file` to use a file-based store. + /// When using `.file` backend, you must also set `.file_options`. + /// The key-value store is exposed as `request.store` in views and is also available in as + /// `env.store` in all jobs/mailers. + pub const store: jetzig.kv.Store.KVOptions = .{ + .backend = .memory, + // .backend = .file, + // .file_options = .{ + // .path = "/path/to/jetkv-store.db", + // .truncate = false, // Set to `true` to clear the store on each server launch. + // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096), + // }, + }; + + /// Job queue options. Identical to `store` options, but allows using different + /// backends (e.g. `.memory` for key-value store, `.file` for jobs queue. + /// The job queue is managed internally by Jetzig. + pub const job_queue: jetzig.kv.Store.KVOptions = .{ + .backend = .memory, + // .backend = .file, + // .file_options = .{ + // .path = "/path/to/jetkv-queue.db", + // .truncate = false, // Set to `true` to clear the store on each server launch. + // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096), + // }, + }; + + /// Cache options. Identical to `store` options, but allows using different + /// backends (e.g. `.memory` for key-value store, `.file` for cache. + pub const cache: jetzig.kv.Store.KVOptions = .{ + .backend = .memory, + // .backend = .file, + // .file_options = .{ + // .path = "/path/to/jetkv-cache.db", + // .truncate = false, // Set to `true` to clear the store on each server launch. + // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096), + // }, + }; + /// SMTP configuration for Jetzig Mail. It is recommended to use a local SMTP relay, /// e.g.: https://github.com/juanluisbaptiste/docker-postfix // pub const smtp: jetzig.mail.SMTPConfig = .{ diff --git a/src/jetzig.zig b/src/jetzig.zig index 893b58d..4d18546 100644 --- a/src/jetzig.zig +++ b/src/jetzig.zig @@ -15,6 +15,7 @@ pub const types = @import("jetzig/types.zig"); pub const markdown = @import("jetzig/markdown.zig"); pub const jobs = @import("jetzig/jobs.zig"); pub const mail = @import("jetzig/mail.zig"); +pub const kv = @import("jetzig/kv.zig"); /// The primary interface for a Jetzig application. Create an `App` in your application's /// `src/main.zig` and call `start` to launch the application. @@ -99,6 +100,45 @@ pub const config = struct { /// milliseconds. pub const job_worker_sleep_interval_ms: usize = 10; + /// Key-value store options. Set backend to `.file` to use a file-based store. + /// When using `.file` backend, you must also set `.file_options`. + /// The key-value store is exposed as `request.store` in views and is also available in as + /// `env.store` in all jobs/mailers. + pub const store: kv.Store.KVOptions = .{ + .backend = .memory, + // .backend = .file, + // .file_options = .{ + // .path = "/path/to/jetkv-store.db", + // .truncate = false, // Set to `true` to clear the store on each server launch. + // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096), + // }, + }; + + /// Job queue options. Identical to `store` options, but allows using different + /// backends (e.g. `.memory` for key-value store, `.file` for jobs queue. + /// The job queue is managed internally by Jetzig. + pub const job_queue: kv.Store.KVOptions = .{ + .backend = .memory, + // .backend = .file, + // .file_options = .{ + // .path = "/path/to/jetkv-queue.db", + // .truncate = false, // Set to `true` to clear the store on each server launch. + // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096), + // }, + }; + + /// Cache. Identical to `store` options, but allows using different + /// backends (e.g. `.memory` for key-value store, `.file` for cache. + pub const cache: kv.Store.KVOptions = .{ + .backend = .memory, + // .backend = .file, + // .file_options = .{ + // .path = "/path/to/jetkv-cache.db", + // .truncate = false, // Set to `true` to clear the store on each server launch. + // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096), + // }, + }; + /// SMTP configuration for Jetzig Mail. pub const smtp: mail.SMTPConfig = .{ .port = 25, @@ -133,7 +173,7 @@ pub fn init(allocator: std.mem.Allocator) !App { const environment = Environment.init(allocator); return .{ - .server_options = try environment.getServerOptions(), + .environment = environment, .allocator = allocator, }; } diff --git a/src/jetzig/App.zig b/src/jetzig/App.zig index 3a9f449..59af1df 100644 --- a/src/jetzig/App.zig +++ b/src/jetzig/App.zig @@ -7,7 +7,7 @@ const mime_types = @import("mime_types").mime_types; // Generated at build time. const App = @This(); -server_options: jetzig.http.Server.ServerOptions, +environment: jetzig.Environment, allocator: std.mem.Allocator, pub fn deinit(self: App) void { @@ -57,9 +57,29 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void { self.allocator.destroy(route); }; - var jet_kv = jetzig.jetkv.JetKV.init(self.allocator, .{}); + var store = try jetzig.kv.Store.init( + self.allocator, + jetzig.config.get(jetzig.kv.Store.KVOptions, "store"), + ); + defer store.deinit(); - if (self.server_options.detach) { + var job_queue = try jetzig.kv.Store.init( + self.allocator, + jetzig.config.get(jetzig.kv.Store.KVOptions, "job_queue"), + ); + defer job_queue.deinit(); + + var cache = try jetzig.kv.Store.init( + self.allocator, + jetzig.config.get(jetzig.kv.Store.KVOptions, "cache"), + ); + defer cache.deinit(); + + const server_options = try self.environment.getServerOptions(); + defer self.allocator.free(server_options.bind); + defer self.allocator.free(server_options.secret); + + if (server_options.detach) { const argv = try std.process.argsAlloc(self.allocator); defer std.process.argsFree(self.allocator, argv); var child_argv = std.ArrayList([]const u8).init(self.allocator); @@ -76,24 +96,30 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void { var server = jetzig.http.Server.init( self.allocator, - self.server_options, + server_options, routes.items, &routes_module.jobs, &routes_module.mailers, &mime_map, - &jet_kv, + &store, + &job_queue, + &cache, ); defer server.deinit(); + var mutex = std.Thread.Mutex{}; var worker_pool = jetzig.jobs.Pool.init( self.allocator, - &jet_kv, + &job_queue, .{ .logger = server.logger, .environment = server.options.environment, .routes = routes.items, .jobs = &routes_module.jobs, .mailers = &routes_module.mailers, + .store = &store, + .cache = &cache, + .mutex = &mutex, }, ); defer worker_pool.deinit(); @@ -108,13 +134,13 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void { error.AddressInUse => { try server.logger.ERROR( "Socket unavailable: {s}:{} - unable to start server.\n", - .{ self.server_options.bind, self.server_options.port }, + .{ server_options.bind, server_options.port }, ); return; }, else => { try server.logger.ERROR("Encountered error: {}\nExiting.\n", .{err}); - return err; + std.process.exit(1); }, } }; diff --git a/src/jetzig/http/Query.zig b/src/jetzig/http/Query.zig index 0e59da1..b1c07c4 100644 --- a/src/jetzig/http/Query.zig +++ b/src/jetzig/http/Query.zig @@ -109,10 +109,10 @@ fn mappingParam(input: []const u8) ?struct { key: []const u8, field: []const u8 fn dataValue(self: Query, value: ?[]const u8) *jetzig.data.Data.Value { if (value) |item_value| { - const duped = self.data.getAllocator().dupe(u8, item_value) catch @panic("OOM"); + const duped = self.data.allocator().dupe(u8, item_value) catch @panic("OOM"); return self.data.string(uriDecode(duped)); } else { - return jetzig.zmpl.Data._null(self.data.getAllocator()); + return jetzig.zmpl.Data._null(self.data.allocator()); } } diff --git a/src/jetzig/http/Request.zig b/src/jetzig/http/Request.zig index 05976cf..a0f7b77 100644 --- a/src/jetzig/http/Request.zig +++ b/src/jetzig/http/Request.zig @@ -31,6 +31,60 @@ redirected: bool = false, rendered_multiple: bool = false, rendered_view: ?jetzig.views.View = null, start_time: i128, +store: ArenaStore, +cache: ArenaStore, + +/// Wrapper for KV store that uses the request's arena allocator for fetching values. +pub const ArenaStore = struct { + allocator: std.mem.Allocator, + store: *jetzig.kv.Store, + + /// Put a String or into the key-value store. + pub fn get(self: ArenaStore, key: []const u8) !?*jetzig.data.Value { + return try self.store.get(try self.data(), key); + } + + /// Get a String from the store. + pub fn put(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void { + try self.store.put(key, value); + } + + /// Remove a String to from the key-value store and return it if found. + pub fn fetchRemove(self: ArenaStore, key: []const u8) !?*jetzig.data.Value { + return try self.store.fetchRemove(try self.data(), key); + } + + /// Remove a String to from the key-value store. + pub fn remove(self: ArenaStore, key: []const u8) !void { + try self.store.remove(key); + } + + /// Append a Value to the end of an Array in the key-value store. + pub fn append(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void { + try self.store.append(key, value); + } + + /// Prepend a Value to the start of an Array in the key-value store. + pub fn prepend(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void { + try self.store.prepend(key, value); + } + + /// Pop a String from an Array in the key-value store. + pub fn pop(self: ArenaStore, key: []const u8) !?*jetzig.data.Value { + return try self.store.pop(try self.data(), key); + } + + /// Left-pop a String from an Array in the key-value store. + pub fn popFirst(self: ArenaStore, key: []const u8) !?*jetzig.data.Value { + return try self.store.popFirst(try self.data(), key); + } + + fn data(self: ArenaStore) !*jetzig.data.Data { + const arena_data = try self.allocator.create(jetzig.data.Data); + arena_data.* = jetzig.data.Data.init(self.allocator); + return arena_data; + } +}; pub fn init( allocator: std.mem.Allocator, @@ -65,11 +119,14 @@ pub fn init( .response_data = response_data, .std_http_request = std_http_request, .start_time = start_time, + .store = .{ .store = server.store, .allocator = allocator }, + .cache = .{ .store = server.cache, .allocator = allocator }, }; } pub fn deinit(self: *Request) void { - // self.session.deinit(); + self.session.deinit(); + self.cookies.deinit(); self.allocator.destroy(self.cookies); self.allocator.destroy(self.session); if (self.processed) self.allocator.free(self.body); @@ -275,37 +332,6 @@ fn parseQuery(self: *Request) !*jetzig.data.Value { return self.query_body.?.data.value.?; } -/// Put a String or Array into the key-value store. -/// `T` can be either `jetzig.KVString` or `jetzig.KVArray` -pub fn kvPut( - self: *Request, - comptime value_type: jetzig.jetkv.value_types, - key: jetzig.jetkv.types.String, - value: jetzig.jetkv.ValueType(value_type), -) !void { - try self.server.jet_kv.put(value_type, key, value); -} - -/// Get a String or Array from the key-value store. -/// `T` can be either `jetzig.KVString` or `jetzig.KVArray` -pub fn kvGet( - self: *Request, - comptime value_type: jetzig.jetkv.value_types, - key: jetzig.jetkv.types.String, -) ?jetzig.jetkv.ValueType(value_type) { - return self.server.jet_kv.get(value_type, key); -} - -/// Pop a String from an Array in the key-value store. -pub fn kvPop(self: *Request, key: jetzig.jetkv.types.String) ?jetzig.jetkv.types.String { - return self.server.jet_kv.pop(key); -} - -/// Return a new Array suitable for use in the KV store. -pub fn kvArray(self: Request) jetzig.jetkv.types.Array { - return jetzig.jetkv.types.Array.init(self.allocator); -} - /// Creates a new Job. Receives a job name which must resolve to `src/app/jobs/.zig` /// Call `Job.put(...)` to set job params. /// Call `Job.background()` to run the job outside of the request/response flow. @@ -322,7 +348,9 @@ pub fn job(self: *Request, job_name: []const u8) !*jetzig.Job { const background_job = try self.allocator.create(jetzig.Job); background_job.* = jetzig.Job.init( self.allocator, - self.server.jet_kv, + self.server.store, + self.server.job_queue, + self.server.cache, self.server.logger, self.server.job_definitions, job_name, @@ -375,6 +403,9 @@ const RequestMail = struct { .routes = self.request.server.routes, .mailers = self.request.server.mailer_definitions, .jobs = self.request.server.job_definitions, + .store = self.request.server.store, + .cache = self.request.server.cache, + .mutex = undefined, }, ), } diff --git a/src/jetzig/http/Server.zig b/src/jetzig/http/Server.zig index 2155546..9434bd6 100644 --- a/src/jetzig/http/Server.zig +++ b/src/jetzig/http/Server.zig @@ -22,7 +22,9 @@ mailer_definitions: []const jetzig.MailerDefinition, mime_map: *jetzig.http.mime.MimeMap, std_net_server: std.net.Server = undefined, initialized: bool = false, -jet_kv: *jetzig.jetkv.JetKV, +store: *jetzig.kv.Store, +job_queue: *jetzig.kv.Store, +cache: *jetzig.kv.Store, const Server = @This(); @@ -33,7 +35,9 @@ pub fn init( job_definitions: []const jetzig.JobDefinition, mailer_definitions: []const jetzig.MailerDefinition, mime_map: *jetzig.http.mime.MimeMap, - jet_kv: *jetzig.jetkv.JetKV, + store: *jetzig.kv.Store, + job_queue: *jetzig.kv.Store, + cache: *jetzig.kv.Store, ) Server { return .{ .allocator = allocator, @@ -43,7 +47,9 @@ pub fn init( .job_definitions = job_definitions, .mailer_definitions = mailer_definitions, .mime_map = mime_map, - .jet_kv = jet_kv, + .store = store, + .job_queue = job_queue, + .cache = cache, }; } diff --git a/src/jetzig/jobs/Job.zig b/src/jetzig/jobs/Job.zig index 6bf8376..f6f9b08 100644 --- a/src/jetzig/jobs/Job.zig +++ b/src/jetzig/jobs/Job.zig @@ -13,10 +13,15 @@ pub const JobEnv = struct { routes: []*const jetzig.Route, mailers: []const jetzig.MailerDefinition, jobs: []const jetzig.JobDefinition, + store: *jetzig.kv.Store, + cache: *jetzig.kv.Store, + mutex: *std.Thread.Mutex, }; allocator: std.mem.Allocator, -jet_kv: *jetzig.jetkv.JetKV, +store: *jetzig.kv.Store, +job_queue: *jetzig.kv.Store, +cache: *jetzig.kv.Store, logger: jetzig.loggers.Logger, name: []const u8, definition: ?JobDefinition, @@ -28,7 +33,9 @@ const Job = @This(); /// Initialize a new Job pub fn init( allocator: std.mem.Allocator, - jet_kv: *jetzig.jetkv.JetKV, + store: *jetzig.kv.Store, + job_queue: *jetzig.kv.Store, + cache: *jetzig.kv.Store, logger: jetzig.loggers.Logger, jobs: []const JobDefinition, name: []const u8, @@ -47,7 +54,9 @@ pub fn init( return .{ .allocator = allocator, - .jet_kv = jet_kv, + .store = store, + .job_queue = job_queue, + .cache = cache, .logger = logger, .name = name, .definition = definition, @@ -65,7 +74,6 @@ pub fn deinit(self: *Job) void { /// Add a Job to the queue pub fn schedule(self: *Job) !void { try self.params.put("__jetzig_job_name", self.data.string(self.name)); - const json = try self.data.toJson(); - try self.jet_kv.prepend("__jetzig_jobs", json); + try self.job_queue.append("__jetzig_jobs", self.data.value.?); try self.logger.INFO("Scheduled job: {s}", .{self.name}); } diff --git a/src/jetzig/jobs/Pool.zig b/src/jetzig/jobs/Pool.zig index d40fd52..caa9e85 100644 --- a/src/jetzig/jobs/Pool.zig +++ b/src/jetzig/jobs/Pool.zig @@ -5,7 +5,7 @@ const jetzig = @import("../../jetzig.zig"); const Pool = @This(); allocator: std.mem.Allocator, -jet_kv: *jetzig.jetkv.JetKV, +job_queue: *jetzig.kv.Store, job_env: jetzig.jobs.JobEnv, pool: std.Thread.Pool = undefined, workers: std.ArrayList(*jetzig.jobs.Worker), @@ -13,12 +13,12 @@ workers: std.ArrayList(*jetzig.jobs.Worker), /// Initialize a new worker thread pool. pub fn init( allocator: std.mem.Allocator, - jet_kv: *jetzig.jetkv.JetKV, + job_queue: *jetzig.kv.Store, job_env: jetzig.jobs.JobEnv, ) Pool { return .{ .allocator = allocator, - .jet_kv = jet_kv, + .job_queue = job_queue, .job_env = job_env, .workers = std.ArrayList(*jetzig.jobs.Worker).init(allocator), }; @@ -42,7 +42,7 @@ pub fn work(self: *Pool, threads: usize, interval: usize) !void { self.allocator, self.job_env, index, - self.jet_kv, + self.job_queue, interval, ); try self.workers.append(worker); diff --git a/src/jetzig/jobs/Worker.zig b/src/jetzig/jobs/Worker.zig index 5455ef1..11bbe96 100644 --- a/src/jetzig/jobs/Worker.zig +++ b/src/jetzig/jobs/Worker.zig @@ -6,21 +6,21 @@ const Worker = @This(); allocator: std.mem.Allocator, job_env: jetzig.jobs.JobEnv, id: usize, -jet_kv: *jetzig.jetkv.JetKV, +job_queue: *jetzig.kv.Store, interval: usize, pub fn init( allocator: std.mem.Allocator, job_env: jetzig.jobs.JobEnv, id: usize, - jet_kv: *jetzig.jetkv.JetKV, + job_queue: *jetzig.kv.Store, interval: usize, ) Worker { return .{ .allocator = allocator, .job_env = job_env, .id = id, - .jet_kv = jet_kv, + .job_queue = job_queue, .interval = interval * 1000 * 1000, // millisecond => nanosecond }; } @@ -30,10 +30,16 @@ pub fn work(self: *const Worker) void { self.log(.INFO, "[worker-{}] Job worker started.", .{self.id}); while (true) { - if (self.jet_kv.pop("__jetzig_jobs")) |json| { - defer self.allocator.free(json); - if (self.matchJob(json)) |job_definition| { - self.processJob(job_definition, json); + var data = jetzig.data.Data.init(self.allocator); + defer data.deinit(); + const maybe_value = self.job_queue.popFirst(&data, "__jetzig_jobs") catch |err| blk: { + self.log(.ERROR, "Error fetching job from queue: {s}", .{@errorName(err)}); + break :blk null; // FIXME: Probably close thread here ? + }; + + if (maybe_value) |value| { + if (self.matchJob(value)) |job_definition| { + self.processJob(job_definition, value); } } else { std.time.sleep(self.interval); @@ -44,27 +50,19 @@ pub fn work(self: *const Worker) void { } // Do a minimal parse of JSON job data to identify job name, then match on known job definitions. -fn matchJob(self: Worker, json: []const u8) ?jetzig.jobs.JobDefinition { - const parsed_json = std.json.parseFromSlice( - struct { __jetzig_job_name: []const u8 }, - self.allocator, - json, - .{ .ignore_unknown_fields = true }, - ) catch |err| { +fn matchJob(self: Worker, value: *const jetzig.data.Value) ?jetzig.jobs.JobDefinition { + const job_name = value.getT(.string, "__jetzig_job_name") orelse { self.log( .ERROR, - "[worker-{}] Error parsing JSON from job queue: {s}", - .{ self.id, @errorName(err) }, + "[worker-{}] Missing expected job name field `__jetzig_job_name`", + .{self.id}, ); return null; }; - const job_name = parsed_json.value.__jetzig_job_name; - // TODO: Hashmap for (self.job_env.jobs) |job_definition| { if (std.mem.eql(u8, job_definition.name, job_name)) { - parsed_json.deinit(); return job_definition; } } else { @@ -75,34 +73,19 @@ fn matchJob(self: Worker, json: []const u8) ?jetzig.jobs.JobDefinition { // Fully parse JSON job data and invoke the defined job's run function, passing the parsed params // as a `*jetzig.data.Value`. -fn processJob(self: Worker, job_definition: jetzig.JobDefinition, json: []const u8) void { - var data = jetzig.data.Data.init(self.allocator); - defer data.deinit(); - - data.fromJson(json) catch |err| { - self.log( - .INFO, - "[worker-{}] Error parsing JSON for job `{s}`: {s}", - .{ self.id, job_definition.name, @errorName(err) }, - ); - }; - +fn processJob(self: Worker, job_definition: jetzig.JobDefinition, params: *jetzig.data.Value) void { var arena = std.heap.ArenaAllocator.init(self.allocator); defer arena.deinit(); - if (data.value) |params| { - job_definition.runFn(arena.allocator(), params, self.job_env) catch |err| { - self.log( - .ERROR, - "[worker-{}] Encountered error processing job `{s}`: {s}", - .{ self.id, job_definition.name, @errorName(err) }, - ); - return; - }; - self.log(.INFO, "[worker-{}] Job completed: {s}", .{ self.id, job_definition.name }); - } else { - self.log(.ERROR, "Error in job params definition for job: {s}", .{job_definition.name}); - } + job_definition.runFn(arena.allocator(), params, self.job_env) catch |err| { + self.log( + .ERROR, + "[worker-{}] Encountered error processing job `{s}`: {s}", + .{ self.id, job_definition.name, @errorName(err) }, + ); + return; + }; + self.log(.INFO, "[worker-{}] Job completed: {s}", .{ self.id, job_definition.name }); } // Log with error handling and fallback. Prefix with worker ID. diff --git a/src/jetzig/kv.zig b/src/jetzig/kv.zig new file mode 100644 index 0000000..96e3052 --- /dev/null +++ b/src/jetzig/kv.zig @@ -0,0 +1,3 @@ +const std = @import("std"); + +pub const Store = @import("kv/Store.zig"); diff --git a/src/jetzig/kv/Store.zig b/src/jetzig/kv/Store.zig new file mode 100644 index 0000000..cddd1bb --- /dev/null +++ b/src/jetzig/kv/Store.zig @@ -0,0 +1,94 @@ +const std = @import("std"); +const jetzig = @import("../../jetzig.zig"); + +const Store = @This(); + +store: jetzig.jetkv.JetKV, +options: KVOptions, + +pub const KVOptions = struct { + backend: enum { memory, file } = .memory, + file_options: struct { + path: ?[]const u8 = null, + address_space_size: u32 = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096), + truncate: bool = false, + } = .{}, +}; + +const ValueType = enum { string, array }; + +/// Initialize a new memory or file store. +pub fn init(allocator: std.mem.Allocator, options: KVOptions) !Store { + const store = try jetzig.jetkv.JetKV.init( + allocator, + switch (options.backend) { + .file => .{ + .backend = .file, + .file_backend_options = .{ + .path = options.file_options.path, + .address_space_size = options.file_options.address_space_size, + .truncate = options.file_options.truncate, + }, + }, + .memory => .{ + .backend = .memory, + }, + }, + ); + + return .{ .store = store, .options = options }; +} + +/// Free allocated resources/close database file. +pub fn deinit(self: *Store) void { + self.store.deinit(); +} + +/// Put a Value or into the key-value store. +pub fn put(self: *Store, key: []const u8, value: *jetzig.data.Value) !void { + try self.store.put(key, try value.toJson()); +} + +/// Get a Value from the store. +pub fn get(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value { + return try parseValue(data, try self.store.get(data.allocator(), key)); +} + +/// Remove a Value to from the key-value store and return it if found. +pub fn fetchRemove(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value { + return try parseValue(data, try self.store.fetchRemove(data.allocator(), key)); +} + +/// Remove a Value to from the key-value store. +pub fn remove(self: *Store, key: []const u8) !void { + try self.store.remove(key); +} + +/// Append a Value to the end of an Array in the key-value store. +pub fn append(self: *Store, key: []const u8, value: *const jetzig.data.Value) !void { + try self.store.append(key, try value.toJson()); +} + +/// Prepend a Value to the start of an Array in the key-value store. +pub fn prepend(self: *Store, key: []const u8, value: *const jetzig.data.Value) !void { + try self.store.prepend(key, try value.toJson()); +} + +/// Pop a Value from an Array in the key-value store. +pub fn pop(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value { + return try parseValue(data, try self.store.pop(data.allocator(), key)); +} + +/// Left-pop a Value from an Array in the key-value store. +pub fn popFirst(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value { + return try parseValue(data, try self.store.popFirst(data.allocator(), key)); +} + +fn parseValue(data: *jetzig.data.Data, maybe_json: ?[]const u8) !?*jetzig.data.Value { + if (maybe_json) |json| { + try data.fromJson(json); + return data.value.?; + } else { + return null; + } +} diff --git a/src/jetzig/views/Route.zig b/src/jetzig/views/Route.zig index 38aceef..ae903f6 100644 --- a/src/jetzig/views/Route.zig +++ b/src/jetzig/views/Route.zig @@ -65,7 +65,7 @@ pub fn initParams(self: *Route, allocator: std.mem.Allocator) !void { pub fn deinitParams(self: *const Route) void { for (self.params.items) |data| { data.deinit(); - data._allocator.destroy(data); + data.parent_allocator.destroy(data); } self.params.deinit(); }