diff --git a/build.zig.zon b/build.zig.zon
index 2754e5e..e809431 100644
--- a/build.zig.zon
+++ b/build.zig.zon
@@ -7,17 +7,17 @@
.hash = "1220482f07f2bbaef335f20d6890c15a1e14739950b784232bc69182423520e058a5",
},
.zmpl = .{
- .url = "https://github.com/jetzig-framework/zmpl/archive/046c05d376a4fe89d86c52596baa18137891cd87.tar.gz",
- .hash = "1220d8890b1161e4356d2c59d4b88280566d55480dae840b6f5dae34bf852bef6b56",
+ .url = "https://github.com/jetzig-framework/zmpl/archive/4457eba50bd2eff3743601aa00adbffebcd4207a.tar.gz",
+ .hash = "122001d661e534ef59fc20936330bb0c3068c8aaf1b7c60f9dde9e58d9a536918754",
+ },
+ .jetkv = .{
+ .url = "https://github.com/jetzig-framework/jetkv/archive/edfda9108c857dd5a04f87ae48667d3ed57612d9.tar.gz",
+ .hash = "122015e8a911a7e3a519bbf09c5ef932b569428f6eb6946417df5747a2e76b2a09b0",
},
.args = .{
.url = "https://github.com/MasterQ32/zig-args/archive/01d72b9a0128c474aeeb9019edd48605fa6d95f7.tar.gz",
.hash = "12208a1de366740d11de525db7289345949f5fd46527db3f89eecc7bb49b012c0732",
},
- .jetkv = .{
- .url = "https://github.com/jetzig-framework/jetkv/archive/a6fcc2df220c1a40094e167eeb567bb5888404e9.tar.gz",
- .hash = "12207bd2d7465b33e745a5b0567172377f94a221d1fc9aab238bb1b372c64f4ec1a0",
- },
.smtp_client = .{
.url = "https://github.com/karlseguin/smtp_client.zig/archive/e79e411862d4f4d41657bf41efb884efca3d67dd.tar.gz",
.hash = "12209907c69891a38e6923308930ac43bfb40135bc609ea370b5759fc2e1c4f57284",
diff --git a/demo/src/app/views/cache.zig b/demo/src/app/views/cache.zig
new file mode 100644
index 0000000..5041502
--- /dev/null
+++ b/demo/src/app/views/cache.zig
@@ -0,0 +1,24 @@
+const std = @import("std");
+const jetzig = @import("jetzig");
+
+pub fn index(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View {
+ var root = try data.object();
+ try root.put("cached_value", try request.cache.get("example"));
+
+ return request.render(.ok);
+}
+
+pub fn post(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View {
+ var root = try data.object();
+
+ const params = try request.params();
+
+ if (params.get("message")) |message| {
+ try request.cache.put("message", message);
+ try root.put("message", message);
+ } else {
+ try root.put("message", data.string("[no message param detected]"));
+ }
+
+ return request.render(.ok);
+}
diff --git a/demo/src/app/views/cache/index.zmpl b/demo/src/app/views/cache/index.zmpl
new file mode 100644
index 0000000..a74c6ac
--- /dev/null
+++ b/demo/src/app/views/cache/index.zmpl
@@ -0,0 +1,3 @@
+
+ Cached value: {{.cached_value}}
+
diff --git a/demo/src/app/views/cache/post.zmpl b/demo/src/app/views/cache/post.zmpl
new file mode 100644
index 0000000..76efa81
--- /dev/null
+++ b/demo/src/app/views/cache/post.zmpl
@@ -0,0 +1,3 @@
+
+ Value "{{.message}}" added to cache
+
diff --git a/demo/src/app/views/kvstore.zig b/demo/src/app/views/kvstore.zig
index 33c1a85..b86a024 100644
--- a/demo/src/app/views/kvstore.zig
+++ b/demo/src/app/views/kvstore.zig
@@ -6,29 +6,23 @@ pub fn index(request: *jetzig.Request, data: *jetzig.Data) !jetzig.View {
// Fetch a string from the KV store. If it exists, store it in the root data object,
// otherwise store a string value to be picked up by the next request.
- if (request.kvGet(.string, "example-key")) |capture| {
- try root.put("stored_string", data.string(capture));
+ if (try request.store.fetchRemove("example-key")) |capture| {
+ try root.put("stored_string", capture);
} else {
- try request.kvPut(.string, "example-key", "example-value");
+ try root.put("stored_string", null);
+ try request.store.put("example-key", data.string("example-value"));
}
- // Pop an item from the array and store it in the root data object. This will empty the
+ // Left-pop an item from the array and store it in the root data object. This will empty the
// array after multiple requests.
- if (request.kvPop("example-array")) |string| try root.put("popped", data.string(string));
-
- // Fetch an array from the KV store. If it exists, store its values in the root data object,
- // otherwise store a new array to be picked up by the next request.
- if (request.kvGet(.array, "example-array")) |kv_array| {
- var array = try data.array();
- for (kv_array.items()) |item| try array.append(data.string(item));
- try root.put("stored_array", array);
+ if (try request.store.popFirst("example-array")) |value| {
+ try root.put("popped", value);
} else {
- // Create a KV Array and store it in the key value store.
- var kv_array = request.kvArray();
- try kv_array.append("hello");
- try kv_array.append("goodbye");
- try kv_array.append("hello again");
- try request.kvPut(.array, "example-array", kv_array);
+ try root.put("popped", null);
+ // Store some values in an array in the KV store.
+ try request.store.append("example-array", data.string("hello"));
+ try request.store.append("example-array", data.string("goodbye"));
+ try request.store.append("example-array", data.string("hello again"));
}
return request.render(.ok);
diff --git a/demo/src/app/views/kvstore/index.zmpl b/demo/src/app/views/kvstore/index.zmpl
new file mode 100644
index 0000000..8234d29
--- /dev/null
+++ b/demo/src/app/views/kvstore/index.zmpl
@@ -0,0 +1,4 @@
+Stored string: {{.stored_string}}
+Left-popped array value: {{.popped}}
+
+Refresh this page to cycle through values
diff --git a/demo/src/main.zig b/demo/src/main.zig
index 702b519..08213d1 100644
--- a/demo/src/main.zig
+++ b/demo/src/main.zig
@@ -41,6 +41,45 @@ pub const jetzig_options = struct {
// milliseconds.
// pub const job_worker_sleep_interval_ms: usize = 10;
+ /// Key-value store options. Set backend to `.file` to use a file-based store.
+ /// When using `.file` backend, you must also set `.file_options`.
+ /// The key-value store is exposed as `request.store` in views and is also available in as
+ /// `env.store` in all jobs/mailers.
+ pub const store: jetzig.kv.Store.KVOptions = .{
+ .backend = .memory,
+ // .backend = .file,
+ // .file_options = .{
+ // .path = "/path/to/jetkv-store.db",
+ // .truncate = false, // Set to `true` to clear the store on each server launch.
+ // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
+ // },
+ };
+
+ /// Job queue options. Identical to `store` options, but allows using different
+ /// backends (e.g. `.memory` for key-value store, `.file` for jobs queue.
+ /// The job queue is managed internally by Jetzig.
+ pub const job_queue: jetzig.kv.Store.KVOptions = .{
+ .backend = .memory,
+ // .backend = .file,
+ // .file_options = .{
+ // .path = "/path/to/jetkv-queue.db",
+ // .truncate = false, // Set to `true` to clear the store on each server launch.
+ // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
+ // },
+ };
+
+ /// Cache options. Identical to `store` options, but allows using different
+ /// backends (e.g. `.memory` for key-value store, `.file` for cache.
+ pub const cache: jetzig.kv.Store.KVOptions = .{
+ .backend = .memory,
+ // .backend = .file,
+ // .file_options = .{
+ // .path = "/path/to/jetkv-cache.db",
+ // .truncate = false, // Set to `true` to clear the store on each server launch.
+ // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
+ // },
+ };
+
/// SMTP configuration for Jetzig Mail. It is recommended to use a local SMTP relay,
/// e.g.: https://github.com/juanluisbaptiste/docker-postfix
// pub const smtp: jetzig.mail.SMTPConfig = .{
diff --git a/src/jetzig.zig b/src/jetzig.zig
index 893b58d..4d18546 100644
--- a/src/jetzig.zig
+++ b/src/jetzig.zig
@@ -15,6 +15,7 @@ pub const types = @import("jetzig/types.zig");
pub const markdown = @import("jetzig/markdown.zig");
pub const jobs = @import("jetzig/jobs.zig");
pub const mail = @import("jetzig/mail.zig");
+pub const kv = @import("jetzig/kv.zig");
/// The primary interface for a Jetzig application. Create an `App` in your application's
/// `src/main.zig` and call `start` to launch the application.
@@ -99,6 +100,45 @@ pub const config = struct {
/// milliseconds.
pub const job_worker_sleep_interval_ms: usize = 10;
+ /// Key-value store options. Set backend to `.file` to use a file-based store.
+ /// When using `.file` backend, you must also set `.file_options`.
+ /// The key-value store is exposed as `request.store` in views and is also available in as
+ /// `env.store` in all jobs/mailers.
+ pub const store: kv.Store.KVOptions = .{
+ .backend = .memory,
+ // .backend = .file,
+ // .file_options = .{
+ // .path = "/path/to/jetkv-store.db",
+ // .truncate = false, // Set to `true` to clear the store on each server launch.
+ // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
+ // },
+ };
+
+ /// Job queue options. Identical to `store` options, but allows using different
+ /// backends (e.g. `.memory` for key-value store, `.file` for jobs queue.
+ /// The job queue is managed internally by Jetzig.
+ pub const job_queue: kv.Store.KVOptions = .{
+ .backend = .memory,
+ // .backend = .file,
+ // .file_options = .{
+ // .path = "/path/to/jetkv-queue.db",
+ // .truncate = false, // Set to `true` to clear the store on each server launch.
+ // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
+ // },
+ };
+
+ /// Cache. Identical to `store` options, but allows using different
+ /// backends (e.g. `.memory` for key-value store, `.file` for cache.
+ pub const cache: kv.Store.KVOptions = .{
+ .backend = .memory,
+ // .backend = .file,
+ // .file_options = .{
+ // .path = "/path/to/jetkv-cache.db",
+ // .truncate = false, // Set to `true` to clear the store on each server launch.
+ // .address_space_size = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
+ // },
+ };
+
/// SMTP configuration for Jetzig Mail.
pub const smtp: mail.SMTPConfig = .{
.port = 25,
@@ -133,7 +173,7 @@ pub fn init(allocator: std.mem.Allocator) !App {
const environment = Environment.init(allocator);
return .{
- .server_options = try environment.getServerOptions(),
+ .environment = environment,
.allocator = allocator,
};
}
diff --git a/src/jetzig/App.zig b/src/jetzig/App.zig
index 3a9f449..59af1df 100644
--- a/src/jetzig/App.zig
+++ b/src/jetzig/App.zig
@@ -7,7 +7,7 @@ const mime_types = @import("mime_types").mime_types; // Generated at build time.
const App = @This();
-server_options: jetzig.http.Server.ServerOptions,
+environment: jetzig.Environment,
allocator: std.mem.Allocator,
pub fn deinit(self: App) void {
@@ -57,9 +57,29 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void {
self.allocator.destroy(route);
};
- var jet_kv = jetzig.jetkv.JetKV.init(self.allocator, .{});
+ var store = try jetzig.kv.Store.init(
+ self.allocator,
+ jetzig.config.get(jetzig.kv.Store.KVOptions, "store"),
+ );
+ defer store.deinit();
- if (self.server_options.detach) {
+ var job_queue = try jetzig.kv.Store.init(
+ self.allocator,
+ jetzig.config.get(jetzig.kv.Store.KVOptions, "job_queue"),
+ );
+ defer job_queue.deinit();
+
+ var cache = try jetzig.kv.Store.init(
+ self.allocator,
+ jetzig.config.get(jetzig.kv.Store.KVOptions, "cache"),
+ );
+ defer cache.deinit();
+
+ const server_options = try self.environment.getServerOptions();
+ defer self.allocator.free(server_options.bind);
+ defer self.allocator.free(server_options.secret);
+
+ if (server_options.detach) {
const argv = try std.process.argsAlloc(self.allocator);
defer std.process.argsFree(self.allocator, argv);
var child_argv = std.ArrayList([]const u8).init(self.allocator);
@@ -76,24 +96,30 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void {
var server = jetzig.http.Server.init(
self.allocator,
- self.server_options,
+ server_options,
routes.items,
&routes_module.jobs,
&routes_module.mailers,
&mime_map,
- &jet_kv,
+ &store,
+ &job_queue,
+ &cache,
);
defer server.deinit();
+ var mutex = std.Thread.Mutex{};
var worker_pool = jetzig.jobs.Pool.init(
self.allocator,
- &jet_kv,
+ &job_queue,
.{
.logger = server.logger,
.environment = server.options.environment,
.routes = routes.items,
.jobs = &routes_module.jobs,
.mailers = &routes_module.mailers,
+ .store = &store,
+ .cache = &cache,
+ .mutex = &mutex,
},
);
defer worker_pool.deinit();
@@ -108,13 +134,13 @@ pub fn start(self: App, routes_module: type, options: AppOptions) !void {
error.AddressInUse => {
try server.logger.ERROR(
"Socket unavailable: {s}:{} - unable to start server.\n",
- .{ self.server_options.bind, self.server_options.port },
+ .{ server_options.bind, server_options.port },
);
return;
},
else => {
try server.logger.ERROR("Encountered error: {}\nExiting.\n", .{err});
- return err;
+ std.process.exit(1);
},
}
};
diff --git a/src/jetzig/http/Query.zig b/src/jetzig/http/Query.zig
index 0e59da1..b1c07c4 100644
--- a/src/jetzig/http/Query.zig
+++ b/src/jetzig/http/Query.zig
@@ -109,10 +109,10 @@ fn mappingParam(input: []const u8) ?struct { key: []const u8, field: []const u8
fn dataValue(self: Query, value: ?[]const u8) *jetzig.data.Data.Value {
if (value) |item_value| {
- const duped = self.data.getAllocator().dupe(u8, item_value) catch @panic("OOM");
+ const duped = self.data.allocator().dupe(u8, item_value) catch @panic("OOM");
return self.data.string(uriDecode(duped));
} else {
- return jetzig.zmpl.Data._null(self.data.getAllocator());
+ return jetzig.zmpl.Data._null(self.data.allocator());
}
}
diff --git a/src/jetzig/http/Request.zig b/src/jetzig/http/Request.zig
index 05976cf..a0f7b77 100644
--- a/src/jetzig/http/Request.zig
+++ b/src/jetzig/http/Request.zig
@@ -31,6 +31,60 @@ redirected: bool = false,
rendered_multiple: bool = false,
rendered_view: ?jetzig.views.View = null,
start_time: i128,
+store: ArenaStore,
+cache: ArenaStore,
+
+/// Wrapper for KV store that uses the request's arena allocator for fetching values.
+pub const ArenaStore = struct {
+ allocator: std.mem.Allocator,
+ store: *jetzig.kv.Store,
+
+ /// Put a String or into the key-value store.
+ pub fn get(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
+ return try self.store.get(try self.data(), key);
+ }
+
+ /// Get a String from the store.
+ pub fn put(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void {
+ try self.store.put(key, value);
+ }
+
+ /// Remove a String to from the key-value store and return it if found.
+ pub fn fetchRemove(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
+ return try self.store.fetchRemove(try self.data(), key);
+ }
+
+ /// Remove a String to from the key-value store.
+ pub fn remove(self: ArenaStore, key: []const u8) !void {
+ try self.store.remove(key);
+ }
+
+ /// Append a Value to the end of an Array in the key-value store.
+ pub fn append(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void {
+ try self.store.append(key, value);
+ }
+
+ /// Prepend a Value to the start of an Array in the key-value store.
+ pub fn prepend(self: ArenaStore, key: []const u8, value: *jetzig.data.Value) !void {
+ try self.store.prepend(key, value);
+ }
+
+ /// Pop a String from an Array in the key-value store.
+ pub fn pop(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
+ return try self.store.pop(try self.data(), key);
+ }
+
+ /// Left-pop a String from an Array in the key-value store.
+ pub fn popFirst(self: ArenaStore, key: []const u8) !?*jetzig.data.Value {
+ return try self.store.popFirst(try self.data(), key);
+ }
+
+ fn data(self: ArenaStore) !*jetzig.data.Data {
+ const arena_data = try self.allocator.create(jetzig.data.Data);
+ arena_data.* = jetzig.data.Data.init(self.allocator);
+ return arena_data;
+ }
+};
pub fn init(
allocator: std.mem.Allocator,
@@ -65,11 +119,14 @@ pub fn init(
.response_data = response_data,
.std_http_request = std_http_request,
.start_time = start_time,
+ .store = .{ .store = server.store, .allocator = allocator },
+ .cache = .{ .store = server.cache, .allocator = allocator },
};
}
pub fn deinit(self: *Request) void {
- // self.session.deinit();
+ self.session.deinit();
+ self.cookies.deinit();
self.allocator.destroy(self.cookies);
self.allocator.destroy(self.session);
if (self.processed) self.allocator.free(self.body);
@@ -275,37 +332,6 @@ fn parseQuery(self: *Request) !*jetzig.data.Value {
return self.query_body.?.data.value.?;
}
-/// Put a String or Array into the key-value store.
-/// `T` can be either `jetzig.KVString` or `jetzig.KVArray`
-pub fn kvPut(
- self: *Request,
- comptime value_type: jetzig.jetkv.value_types,
- key: jetzig.jetkv.types.String,
- value: jetzig.jetkv.ValueType(value_type),
-) !void {
- try self.server.jet_kv.put(value_type, key, value);
-}
-
-/// Get a String or Array from the key-value store.
-/// `T` can be either `jetzig.KVString` or `jetzig.KVArray`
-pub fn kvGet(
- self: *Request,
- comptime value_type: jetzig.jetkv.value_types,
- key: jetzig.jetkv.types.String,
-) ?jetzig.jetkv.ValueType(value_type) {
- return self.server.jet_kv.get(value_type, key);
-}
-
-/// Pop a String from an Array in the key-value store.
-pub fn kvPop(self: *Request, key: jetzig.jetkv.types.String) ?jetzig.jetkv.types.String {
- return self.server.jet_kv.pop(key);
-}
-
-/// Return a new Array suitable for use in the KV store.
-pub fn kvArray(self: Request) jetzig.jetkv.types.Array {
- return jetzig.jetkv.types.Array.init(self.allocator);
-}
-
/// Creates a new Job. Receives a job name which must resolve to `src/app/jobs/.zig`
/// Call `Job.put(...)` to set job params.
/// Call `Job.background()` to run the job outside of the request/response flow.
@@ -322,7 +348,9 @@ pub fn job(self: *Request, job_name: []const u8) !*jetzig.Job {
const background_job = try self.allocator.create(jetzig.Job);
background_job.* = jetzig.Job.init(
self.allocator,
- self.server.jet_kv,
+ self.server.store,
+ self.server.job_queue,
+ self.server.cache,
self.server.logger,
self.server.job_definitions,
job_name,
@@ -375,6 +403,9 @@ const RequestMail = struct {
.routes = self.request.server.routes,
.mailers = self.request.server.mailer_definitions,
.jobs = self.request.server.job_definitions,
+ .store = self.request.server.store,
+ .cache = self.request.server.cache,
+ .mutex = undefined,
},
),
}
diff --git a/src/jetzig/http/Server.zig b/src/jetzig/http/Server.zig
index 2155546..9434bd6 100644
--- a/src/jetzig/http/Server.zig
+++ b/src/jetzig/http/Server.zig
@@ -22,7 +22,9 @@ mailer_definitions: []const jetzig.MailerDefinition,
mime_map: *jetzig.http.mime.MimeMap,
std_net_server: std.net.Server = undefined,
initialized: bool = false,
-jet_kv: *jetzig.jetkv.JetKV,
+store: *jetzig.kv.Store,
+job_queue: *jetzig.kv.Store,
+cache: *jetzig.kv.Store,
const Server = @This();
@@ -33,7 +35,9 @@ pub fn init(
job_definitions: []const jetzig.JobDefinition,
mailer_definitions: []const jetzig.MailerDefinition,
mime_map: *jetzig.http.mime.MimeMap,
- jet_kv: *jetzig.jetkv.JetKV,
+ store: *jetzig.kv.Store,
+ job_queue: *jetzig.kv.Store,
+ cache: *jetzig.kv.Store,
) Server {
return .{
.allocator = allocator,
@@ -43,7 +47,9 @@ pub fn init(
.job_definitions = job_definitions,
.mailer_definitions = mailer_definitions,
.mime_map = mime_map,
- .jet_kv = jet_kv,
+ .store = store,
+ .job_queue = job_queue,
+ .cache = cache,
};
}
diff --git a/src/jetzig/jobs/Job.zig b/src/jetzig/jobs/Job.zig
index 6bf8376..f6f9b08 100644
--- a/src/jetzig/jobs/Job.zig
+++ b/src/jetzig/jobs/Job.zig
@@ -13,10 +13,15 @@ pub const JobEnv = struct {
routes: []*const jetzig.Route,
mailers: []const jetzig.MailerDefinition,
jobs: []const jetzig.JobDefinition,
+ store: *jetzig.kv.Store,
+ cache: *jetzig.kv.Store,
+ mutex: *std.Thread.Mutex,
};
allocator: std.mem.Allocator,
-jet_kv: *jetzig.jetkv.JetKV,
+store: *jetzig.kv.Store,
+job_queue: *jetzig.kv.Store,
+cache: *jetzig.kv.Store,
logger: jetzig.loggers.Logger,
name: []const u8,
definition: ?JobDefinition,
@@ -28,7 +33,9 @@ const Job = @This();
/// Initialize a new Job
pub fn init(
allocator: std.mem.Allocator,
- jet_kv: *jetzig.jetkv.JetKV,
+ store: *jetzig.kv.Store,
+ job_queue: *jetzig.kv.Store,
+ cache: *jetzig.kv.Store,
logger: jetzig.loggers.Logger,
jobs: []const JobDefinition,
name: []const u8,
@@ -47,7 +54,9 @@ pub fn init(
return .{
.allocator = allocator,
- .jet_kv = jet_kv,
+ .store = store,
+ .job_queue = job_queue,
+ .cache = cache,
.logger = logger,
.name = name,
.definition = definition,
@@ -65,7 +74,6 @@ pub fn deinit(self: *Job) void {
/// Add a Job to the queue
pub fn schedule(self: *Job) !void {
try self.params.put("__jetzig_job_name", self.data.string(self.name));
- const json = try self.data.toJson();
- try self.jet_kv.prepend("__jetzig_jobs", json);
+ try self.job_queue.append("__jetzig_jobs", self.data.value.?);
try self.logger.INFO("Scheduled job: {s}", .{self.name});
}
diff --git a/src/jetzig/jobs/Pool.zig b/src/jetzig/jobs/Pool.zig
index d40fd52..caa9e85 100644
--- a/src/jetzig/jobs/Pool.zig
+++ b/src/jetzig/jobs/Pool.zig
@@ -5,7 +5,7 @@ const jetzig = @import("../../jetzig.zig");
const Pool = @This();
allocator: std.mem.Allocator,
-jet_kv: *jetzig.jetkv.JetKV,
+job_queue: *jetzig.kv.Store,
job_env: jetzig.jobs.JobEnv,
pool: std.Thread.Pool = undefined,
workers: std.ArrayList(*jetzig.jobs.Worker),
@@ -13,12 +13,12 @@ workers: std.ArrayList(*jetzig.jobs.Worker),
/// Initialize a new worker thread pool.
pub fn init(
allocator: std.mem.Allocator,
- jet_kv: *jetzig.jetkv.JetKV,
+ job_queue: *jetzig.kv.Store,
job_env: jetzig.jobs.JobEnv,
) Pool {
return .{
.allocator = allocator,
- .jet_kv = jet_kv,
+ .job_queue = job_queue,
.job_env = job_env,
.workers = std.ArrayList(*jetzig.jobs.Worker).init(allocator),
};
@@ -42,7 +42,7 @@ pub fn work(self: *Pool, threads: usize, interval: usize) !void {
self.allocator,
self.job_env,
index,
- self.jet_kv,
+ self.job_queue,
interval,
);
try self.workers.append(worker);
diff --git a/src/jetzig/jobs/Worker.zig b/src/jetzig/jobs/Worker.zig
index 5455ef1..11bbe96 100644
--- a/src/jetzig/jobs/Worker.zig
+++ b/src/jetzig/jobs/Worker.zig
@@ -6,21 +6,21 @@ const Worker = @This();
allocator: std.mem.Allocator,
job_env: jetzig.jobs.JobEnv,
id: usize,
-jet_kv: *jetzig.jetkv.JetKV,
+job_queue: *jetzig.kv.Store,
interval: usize,
pub fn init(
allocator: std.mem.Allocator,
job_env: jetzig.jobs.JobEnv,
id: usize,
- jet_kv: *jetzig.jetkv.JetKV,
+ job_queue: *jetzig.kv.Store,
interval: usize,
) Worker {
return .{
.allocator = allocator,
.job_env = job_env,
.id = id,
- .jet_kv = jet_kv,
+ .job_queue = job_queue,
.interval = interval * 1000 * 1000, // millisecond => nanosecond
};
}
@@ -30,10 +30,16 @@ pub fn work(self: *const Worker) void {
self.log(.INFO, "[worker-{}] Job worker started.", .{self.id});
while (true) {
- if (self.jet_kv.pop("__jetzig_jobs")) |json| {
- defer self.allocator.free(json);
- if (self.matchJob(json)) |job_definition| {
- self.processJob(job_definition, json);
+ var data = jetzig.data.Data.init(self.allocator);
+ defer data.deinit();
+ const maybe_value = self.job_queue.popFirst(&data, "__jetzig_jobs") catch |err| blk: {
+ self.log(.ERROR, "Error fetching job from queue: {s}", .{@errorName(err)});
+ break :blk null; // FIXME: Probably close thread here ?
+ };
+
+ if (maybe_value) |value| {
+ if (self.matchJob(value)) |job_definition| {
+ self.processJob(job_definition, value);
}
} else {
std.time.sleep(self.interval);
@@ -44,27 +50,19 @@ pub fn work(self: *const Worker) void {
}
// Do a minimal parse of JSON job data to identify job name, then match on known job definitions.
-fn matchJob(self: Worker, json: []const u8) ?jetzig.jobs.JobDefinition {
- const parsed_json = std.json.parseFromSlice(
- struct { __jetzig_job_name: []const u8 },
- self.allocator,
- json,
- .{ .ignore_unknown_fields = true },
- ) catch |err| {
+fn matchJob(self: Worker, value: *const jetzig.data.Value) ?jetzig.jobs.JobDefinition {
+ const job_name = value.getT(.string, "__jetzig_job_name") orelse {
self.log(
.ERROR,
- "[worker-{}] Error parsing JSON from job queue: {s}",
- .{ self.id, @errorName(err) },
+ "[worker-{}] Missing expected job name field `__jetzig_job_name`",
+ .{self.id},
);
return null;
};
- const job_name = parsed_json.value.__jetzig_job_name;
-
// TODO: Hashmap
for (self.job_env.jobs) |job_definition| {
if (std.mem.eql(u8, job_definition.name, job_name)) {
- parsed_json.deinit();
return job_definition;
}
} else {
@@ -75,34 +73,19 @@ fn matchJob(self: Worker, json: []const u8) ?jetzig.jobs.JobDefinition {
// Fully parse JSON job data and invoke the defined job's run function, passing the parsed params
// as a `*jetzig.data.Value`.
-fn processJob(self: Worker, job_definition: jetzig.JobDefinition, json: []const u8) void {
- var data = jetzig.data.Data.init(self.allocator);
- defer data.deinit();
-
- data.fromJson(json) catch |err| {
- self.log(
- .INFO,
- "[worker-{}] Error parsing JSON for job `{s}`: {s}",
- .{ self.id, job_definition.name, @errorName(err) },
- );
- };
-
+fn processJob(self: Worker, job_definition: jetzig.JobDefinition, params: *jetzig.data.Value) void {
var arena = std.heap.ArenaAllocator.init(self.allocator);
defer arena.deinit();
- if (data.value) |params| {
- job_definition.runFn(arena.allocator(), params, self.job_env) catch |err| {
- self.log(
- .ERROR,
- "[worker-{}] Encountered error processing job `{s}`: {s}",
- .{ self.id, job_definition.name, @errorName(err) },
- );
- return;
- };
- self.log(.INFO, "[worker-{}] Job completed: {s}", .{ self.id, job_definition.name });
- } else {
- self.log(.ERROR, "Error in job params definition for job: {s}", .{job_definition.name});
- }
+ job_definition.runFn(arena.allocator(), params, self.job_env) catch |err| {
+ self.log(
+ .ERROR,
+ "[worker-{}] Encountered error processing job `{s}`: {s}",
+ .{ self.id, job_definition.name, @errorName(err) },
+ );
+ return;
+ };
+ self.log(.INFO, "[worker-{}] Job completed: {s}", .{ self.id, job_definition.name });
}
// Log with error handling and fallback. Prefix with worker ID.
diff --git a/src/jetzig/kv.zig b/src/jetzig/kv.zig
new file mode 100644
index 0000000..96e3052
--- /dev/null
+++ b/src/jetzig/kv.zig
@@ -0,0 +1,3 @@
+const std = @import("std");
+
+pub const Store = @import("kv/Store.zig");
diff --git a/src/jetzig/kv/Store.zig b/src/jetzig/kv/Store.zig
new file mode 100644
index 0000000..cddd1bb
--- /dev/null
+++ b/src/jetzig/kv/Store.zig
@@ -0,0 +1,94 @@
+const std = @import("std");
+const jetzig = @import("../../jetzig.zig");
+
+const Store = @This();
+
+store: jetzig.jetkv.JetKV,
+options: KVOptions,
+
+pub const KVOptions = struct {
+ backend: enum { memory, file } = .memory,
+ file_options: struct {
+ path: ?[]const u8 = null,
+ address_space_size: u32 = jetzig.jetkv.JetKV.FileBackend.addressSpace(4096),
+ truncate: bool = false,
+ } = .{},
+};
+
+const ValueType = enum { string, array };
+
+/// Initialize a new memory or file store.
+pub fn init(allocator: std.mem.Allocator, options: KVOptions) !Store {
+ const store = try jetzig.jetkv.JetKV.init(
+ allocator,
+ switch (options.backend) {
+ .file => .{
+ .backend = .file,
+ .file_backend_options = .{
+ .path = options.file_options.path,
+ .address_space_size = options.file_options.address_space_size,
+ .truncate = options.file_options.truncate,
+ },
+ },
+ .memory => .{
+ .backend = .memory,
+ },
+ },
+ );
+
+ return .{ .store = store, .options = options };
+}
+
+/// Free allocated resources/close database file.
+pub fn deinit(self: *Store) void {
+ self.store.deinit();
+}
+
+/// Put a Value or into the key-value store.
+pub fn put(self: *Store, key: []const u8, value: *jetzig.data.Value) !void {
+ try self.store.put(key, try value.toJson());
+}
+
+/// Get a Value from the store.
+pub fn get(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
+ return try parseValue(data, try self.store.get(data.allocator(), key));
+}
+
+/// Remove a Value to from the key-value store and return it if found.
+pub fn fetchRemove(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
+ return try parseValue(data, try self.store.fetchRemove(data.allocator(), key));
+}
+
+/// Remove a Value to from the key-value store.
+pub fn remove(self: *Store, key: []const u8) !void {
+ try self.store.remove(key);
+}
+
+/// Append a Value to the end of an Array in the key-value store.
+pub fn append(self: *Store, key: []const u8, value: *const jetzig.data.Value) !void {
+ try self.store.append(key, try value.toJson());
+}
+
+/// Prepend a Value to the start of an Array in the key-value store.
+pub fn prepend(self: *Store, key: []const u8, value: *const jetzig.data.Value) !void {
+ try self.store.prepend(key, try value.toJson());
+}
+
+/// Pop a Value from an Array in the key-value store.
+pub fn pop(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
+ return try parseValue(data, try self.store.pop(data.allocator(), key));
+}
+
+/// Left-pop a Value from an Array in the key-value store.
+pub fn popFirst(self: *Store, data: *jetzig.data.Data, key: []const u8) !?*jetzig.data.Value {
+ return try parseValue(data, try self.store.popFirst(data.allocator(), key));
+}
+
+fn parseValue(data: *jetzig.data.Data, maybe_json: ?[]const u8) !?*jetzig.data.Value {
+ if (maybe_json) |json| {
+ try data.fromJson(json);
+ return data.value.?;
+ } else {
+ return null;
+ }
+}
diff --git a/src/jetzig/views/Route.zig b/src/jetzig/views/Route.zig
index 38aceef..ae903f6 100644
--- a/src/jetzig/views/Route.zig
+++ b/src/jetzig/views/Route.zig
@@ -65,7 +65,7 @@ pub fn initParams(self: *Route, allocator: std.mem.Allocator) !void {
pub fn deinitParams(self: *const Route) void {
for (self.params.items) |data| {
data.deinit();
- data._allocator.destroy(data);
+ data.parent_allocator.destroy(data);
}
self.params.deinit();
}