file_upload_api.dart 19 KB


  1. import 'dart:io';
  2. import 'package:file_upload_processor/handlers/base_api.dart';
  3. import 'package:intl/intl.dart';
  4. import 'package:mime/mime.dart';
  5. import 'package:shelf/shelf.dart' as shelf;
  6. import 'package:path/path.dart' as path;
  7. import 'package:http_parser/http_parser.dart';
  8. import 'package:archive/archive.dart';
  9. import 'package:supabase/supabase.dart';
  10. class FileUploadApi extends BaseApi {
  11. FileUploadApi(shelf.Request request) : super(request);
  12. final uploadFolder = "./uploaded";
  13. String get rawFolder => "$uploadFolder/raw";
  14. String get dataFolder => "$uploadFolder/data";
  15. String workingFolder = "";
  16. String get zipFolder => "$rawFolder/$workingFolder";
  17. String get extFolder => "$dataFolder/$workingFolder";
  18. SupabaseClient getSupabaseClient(shelf.Request request) {
  19. final supabaseUrl = request.headers['supabase-url'];
  20. if (supabaseUrl == null) {
  21. throw Exception('Supabase URL not provided in headers');
  22. }
  23. final authHeader = request.headers['authorization'];
  24. if (authHeader == null || !authHeader.startsWith('Bearer ')) {
  25. throw Exception('Invalid or missing Authorization Bearer token');
  26. }
  27. final token = authHeader.substring(7); // Remove 'Bearer ' prefix
  28. return SupabaseClient(
  29. supabaseUrl,
  30. token,
  31. );
  32. }
  33. String getTimestampedFilename(String originalFilename) {
  34. final timestamp = DateFormat('yyyyMMdd_HHmmss').format(DateTime.now());
  35. return '${timestamp}_$originalFilename';
  36. }
  37. Future<void> uploadToSupabase(
  38. String filePath, String filename, SupabaseClient supabaseClient,
  39. {bool timestamped = false,
  40. required String bucket,
  41. bool upsert = true}) async {
  42. try {
  43. final file = File(filePath);
  44. final timestampedFilename =
  45. timestamped ? getTimestampedFilename(filename) : filename;
  46. await supabaseClient.storage.from(bucket).upload(
  47. timestampedFilename,
  48. file,
  49. fileOptions: FileOptions(
  50. cacheControl: '3600',
  51. upsert: upsert,
  52. ),
  53. );
  54. print('+File uploaded to <$bucket>: $timestampedFilename');
  55. } catch (e) {
  56. print('!Error uploading to Supabase: $e');
  57. rethrow;
  58. }
  59. }
  60. Future<void> initializeDirectories() async {
  61. final directories = [
  62. Directory(rawFolder),
  63. Directory(dataFolder),
  64. Directory(zipFolder),
  65. Directory(extFolder),
  66. ];
  67. for (var dir in directories) {
  68. if (!await dir.exists()) {
  69. await dir.create(recursive: true);
  70. }
  71. }
  72. }
  73. bool isZipFile(List<int> bytes) {
  74. if (bytes.length < 4) return false;
  75. return bytes[0] == 0x50 &&
  76. bytes[1] == 0x4B &&
  77. bytes[2] == 0x03 &&
  78. bytes[3] == 0x04;
  79. }
  80. Future<List<String>> processZipFile(String filePath) async {
  81. List<String> files = [];
  82. final bytes = await File(filePath).readAsBytes();
  83. final archive = ZipDecoder().decodeBytes(bytes);
  84. for (final file in archive) {
  85. final filename = file.name;
  86. if (file.isFile) {
  87. final data = file.content as List<int>;
  88. final outFile = File(path.join(extFolder, filename));
  89. await outFile.parent.create(recursive: true);
  90. await outFile.writeAsBytes(data);
  91. files.add(path.join(extFolder, filename));
  92. }
  93. }
  94. return files;
  95. }
  96. @override
  97. response() async {
  98. workingFolder = DateTime.now().millisecondsSinceEpoch.toString();
  99. final supabaseClient = getSupabaseClient(request);
  100. await initializeDirectories();
  101. final contentType = request.headers['content-type'];
  102. if (contentType == null ||
  103. !contentType.toLowerCase().startsWith('multipart/form-data')) {
  104. return shelf.Response.badRequest(
  105. body: 'Content-Type must be multipart/form-data');
  106. }
  107. try {
  108. final mediaType = MediaType.parse(contentType);
  109. final boundary = mediaType.parameters['boundary'];
  110. if (boundary == null) {
  111. return shelf.Response.badRequest(body: 'Boundary not found');
  112. }
  113. final transformer = MimeMultipartTransformer(boundary);
  114. final bodyBytes = await request.read().expand((e) => e).toList();
  115. final stream = Stream.fromIterable([bodyBytes]);
  116. final parts = await transformer.bind(stream).toList();
  117. for (var part in parts) {
  118. final contentDisposition = part.headers['content-disposition'];
  119. if (contentDisposition == null) continue;
  120. final filenameMatch =
  121. RegExp(r'filename="([^"]*)"').firstMatch(contentDisposition);
  122. if (filenameMatch == null) continue;
  123. final filename = filenameMatch.group(1);
  124. if (filename == null) continue;
  125. final bytes = await part.fold<List<int>>(
  126. [],
  127. (prev, element) => [...prev, ...element],
  128. );
  129. final rawFilePath = path.join(zipFolder, filename);
  130. await File(rawFilePath).writeAsBytes(bytes);
  131. List<String> files = [];
  132. if (isZipFile(bytes)) {
  133. files.addAll(await processZipFile(rawFilePath));
  134. } else {
  135. final dataFilePath = path.join(extFolder, filename);
  136. await File(rawFilePath).copy(dataFilePath);
  137. files.add(dataFilePath);
  138. }
  139. bytes.clear();
  140. //upload to supabase storage
  141. await uploadToSupabase(rawFilePath, filename, supabaseClient,
  142. bucket: 'csvhich', timestamped: false, upsert: true);
  143. //upload to supabase storage archive timestamped
  144. await uploadToSupabase(rawFilePath, filename, supabaseClient,
  145. bucket: 'csvhich_archive', timestamped: true, upsert: false);
  146. //insert data to supabase csvhichupdates
  147. await supabaseClient
  148. .from('csvhichupdates')
  149. .insert({'filename': filename});
  150. for (var file in files) {
  151. final fileProcess = FileProcess(file, supabaseClient);
  152. await fileProcess.go(donttouchdb: true);
  153. }
  154. }
  155. return shelf.Response.ok('File processed and uploaded successfully');
  156. } catch (e, stackTrace) {
  157. //print('Error: $e\n$stackTrace');
  158. return shelf.Response.internalServerError(
  159. body: 'Error processing upload: $e');
  160. } finally {
  161. supabaseClient.dispose();
  162. await File(zipFolder).delete(recursive: true);
  163. await File(extFolder).delete(recursive: true);
  164. }
  165. }
  166. }
  167. class FileProcess {
  168. FileProcess(this.filepath, this.supabase);
  169. final String filepath;
  170. final SupabaseClient supabase;
  171. String get filename => filepath.replaceAll('\\', "/").split("/").last;
  172. final Map<String, String> tables = {
  173. "secondprgtype.txt": "aclegs_csv",
  174. "ExportPGRGPNmois.txt": "pnlegs_csv",
  175. "exportPGRGPN.txt": "pnlegs_csv",
  176. "exportlicence.txt": "licences_csv",
  177. };
  178. final Map<String, List<String>> _headers = {
  179. "secondprgtype.txt": [
  180. "leg_no",
  181. "fn_carrier",
  182. "fn_number",
  183. "fn_suffix",
  184. "day_of_origin",
  185. "ac_owner",
  186. "ac_subtype",
  187. "ac_version",
  188. "ac_registration",
  189. "dep_ap_actual",
  190. "dep_ap_sched",
  191. "dep_dt_est",
  192. "dep_sched_dt",
  193. "arr_ap_actual",
  194. "arr_ap_sched",
  195. "arr_dt_est",
  196. "arr_sched_dt",
  197. "slot_time_actual",
  198. "leg_type",
  199. "status",
  200. "employer_cockpit",
  201. "employer_cabin",
  202. "cycles",
  203. "delay_code_01",
  204. "delay_code_02",
  205. "delay_code_03",
  206. "delay_code_04",
  207. "delay_time_01",
  208. "delay_time_02",
  209. "delay_time_03",
  210. "delay_time_04",
  211. "subdelay_code_01",
  212. "subdelay_code_02",
  213. "subdelay_code_03",
  214. "subdelay_code_04",
  215. "pax_booked_c",
  216. "pax_booked_y",
  217. "pax_booked_trs_c",
  218. "pax_booked_trs_y",
  219. "pad_booked_c",
  220. "pad_booked_y",
  221. "offblock_dt_a",
  222. "airborne_dt_a",
  223. "landing_dt_a",
  224. "onblock_dt_a",
  225. "offblock_dt_f",
  226. "airborne_dt_f",
  227. "landing_dt_f",
  228. "onblock_dt_f",
  229. "offblock_dt_m",
  230. "airborne_dt_m",
  231. "landing_dt_m",
  232. "onblock_dt_m",
  233. "eet",
  234. ],
  235. "exportPGRGPN.txt": [
  236. "date",
  237. "tlc",
  238. "actype",
  239. "al",
  240. "fnum",
  241. "ddep",
  242. "hdep",
  243. "ddes",
  244. "hdes",
  245. "dep",
  246. "des",
  247. "label",
  248. "type",
  249. ],
  250. "ExportPGRGPNmois.txt": [
  251. "date",
  252. "tlc",
  253. "actype",
  254. "al",
  255. "fnum",
  256. "ddep",
  257. "hdep",
  258. "ddes",
  259. "hdes",
  260. "dep",
  261. "des",
  262. "label",
  263. "type",
  264. ],
  265. "exportlicence.txt": [
  266. "tlc",
  267. "fname",
  268. "mname",
  269. "lname",
  270. "expire",
  271. "ac",
  272. "college",
  273. "base",
  274. ],
  275. };
  276. final Map<String, String> scopes = {
  277. "secondprgtype.txt": "day_of_origin",
  278. "exportPGRGPN.txt": "date",
  279. "ExportPGRGPNmois.txt": "date",
  280. "exportlicence.txt": "tlc",
  281. };
  282. final Map<String, List<String>> idToRemove = {
  283. "secondprgtype.txt": ["day_of_origin"],
  284. "exportPGRGPN.txt": ["date", "tlc"],
  285. "ExportPGRGPNmois.txt": ["date", "tlc"],
  286. "exportlicence.txt": ["tlc"],
  287. };
  288. final Map<String, List<Map<String, dynamic>>> trackers = {
  289. /* "secondprgtype.txt": {
  290. "table": "aclegs_log",
  291. "headers": [
  292. "day_of_origin",
  293. "dep_sched_dt",
  294. "fn_carrier",
  295. "fn_number",
  296. "dep_ap_sched",
  297. "arr_ap_sched",
  298. // "dep_ap_actual",
  299. // "arr_ap_actual"
  300. ]
  301. },
  302. */
  303. "exportPGRGPN.txt": [
  304. {
  305. "table": "pnlegs_log_roster",
  306. "groupby": ["date", "tlc"],
  307. "track": ["dep", "des", "al", "fnum", "label"]
  308. },
  309. // {
  310. // "table": "pnlegs_log_duty",
  311. // "groupby": ["date", "dep", "des", "al", "fnum", "label"],
  312. // "track": ["tlc"]
  313. // },
  314. // {
  315. // "table": "pnlegs_log_sched",
  316. // "groupby": ["date", "dep", "des", "al", "fnum", "label"],
  317. // "changes": ["hdep", "hdes"]
  318. // },
  319. ],
  320. /* "ExportPGRGPNmois.txt": {
  321. "table": "pnlegs_log",
  322. "headers": ["tlc", "date", "dep", "des", "al", "fnum", "label"]
  323. },
  324. "exportlicence.txt": {
  325. "table": "qualifs_log",
  326. "headers": ["tlc", "college", "ac", "base"]
  327. },
  328. */
  329. };
  330. Future<List<Map<String, dynamic>>> parseCsv() async {
  331. final headers = _headers[filename] ?? [];
  332. if (headers.isEmpty) {
  333. throw Exception('No headers found for file: $filename');
  334. }
  335. // Initialize an empty list to hold the parsed data
  336. List<Map<String, dynamic>> data = [];
  337. // Read the CSV file
  338. final file = File(filepath);
  339. final lines = await file.readAsLines();
  340. // Iterate over each line in the CSV file
  341. for (int i = 0; i < lines.length; i++) {
  342. // Split the line into individual values
  343. final values = lines[i].split(',');
  344. if (values.length != headers.length) {
  345. // print('Skipping line $i: Incorrect number of values: line: $i');
  346. continue;
  347. }
  348. // Create a map for the current row
  349. Map<String, dynamic> row = {};
  350. // Assign each value to the corresponding header
  351. for (int j = 0; j < headers.length; j++) {
  352. row[headers[j]] = values[j].trim().removeQuotes.trim().nullIfEmpty;
  353. }
  354. // Add the row map to the data list
  355. data.add(row);
  356. }
  357. // Return the parsed data
  358. return data;
  359. }
  360. List<String> get filesTomonitor => _headers.keys.toList();
  361. Future<void> go({bool donttouchdb = false}) async {
  362. if (!filesTomonitor.contains(filename)) return;
  363. final allmapsToInsert = await parseCsv();
  364. final scopeName = scopes[filename] ?? "";
  365. final scopesInNew = allmapsToInsert
  366. .fold(<String>{}, (t, e) => t..add(e[scopeName] ?? "")).toList();
  367. for (var scopeInNew in scopesInNew) {
  368. final mapsToInsert =
  369. allmapsToInsert.where((e) => e[scopeName] == scopeInNew).toList();
  370. List<Map<String, dynamic>> oldIds = [];
  371. List<Map<String, dynamic>> oldComparable = [];
  372. //load old data
  373. final res = await supabase
  374. .from(tables[filename]!)
  375. .select()
  376. .eq(scopeName, scopeInNew)
  377. .limit(300000);
  378. oldIds.addAll(res.map((e) => {"id": e["id"]}));
  379. oldComparable.addAll(res.map((e) => e..remove("id")));
  380. final comparisonResult = compareLists(oldComparable, mapsToInsert);
  381. final indexToRemove = comparisonResult.removeIndices;
  382. final indexToMaintain = comparisonResult.maintainIndices;
  383. final dataToInsert = comparisonResult.insertData;
  384. try {
  385. if (!donttouchdb)
  386. for (var e in chunkList(
  387. indexToRemove.map((f) => oldIds[f]['id']).toList(), 100)) {
  388. await supabase
  389. .from(tables[filename]!) // Replace with your actual table name
  390. .delete()
  391. .inFilter('id', e);
  392. }
  393. // insering new data
  394. if (!donttouchdb)
  395. await supabase
  396. .from(tables[filename]!) // Replace with your actual table name
  397. .insert(dataToInsert);
  398. } catch (e, stackTrace) {
  399. print('Error: $e\n$stackTrace');
  400. }
  401. print(
  402. " Scope:$scopeInNew insert:${dataToInsert.length} remove:${indexToRemove.length} maintain:${indexToMaintain.length}");
  403. for (var tracker in trackers[filename] ?? []) {
  404. final table = tracker["table"];
  405. final groupby = tracker["groupby"] ?? [];
  406. final track = tracker["track"] ?? [];
  407. final stateOld = oldComparable.groupBy(
  408. (e) => groupby.map((f) => e[f]).join("|"),
  409. dataFunction: (e) => e
  410. .filterKeys(track)
  411. .values
  412. .map((j) => j == null ? "" : j)
  413. .join("_"));
  414. final stateNew = dataToInsert.groupBy(
  415. (e) => groupby.map((f) => e[f]).join("|"),
  416. dataFunction: (e) => e
  417. .filterKeys(track)
  418. .values
  419. .map((j) => j == null ? "" : j)
  420. .join("_"));
  421. List logs = [];
  422. for (var key
  423. in (stateOld.keys.toList()..addAll(stateNew.keys)).toSet()) {
  424. final (add, remove) = (stateNew[key] ?? []).diff(stateOld[key] ?? []);
  425. if (add.isNotEmpty || remove.isNotEmpty) {
  426. logs.add("$key:\n +$add}\n -$remove\n");
  427. }
  428. }
  429. print(" Tracker:$table");
  430. print(" $logs");
  431. }
  432. }
  433. }
  434. ({
  435. List<int> maintainIndices,
  436. List<int> removeIndices,
  437. // List<int> insertIndices
  438. List<Map> insertData
  439. }) compareLists(
  440. List<Map<String, dynamic>> map1, List<Map<String, dynamic>> map2) {
  441. List<int> maintainIndices = [];
  442. List<int> removeIndices = [];
  443. List<Map<String, dynamic>> insertData = map2;
  444. // Find indices to maintain and remove in map1
  445. for (int i = 0; i < map1.length; i++) {
  446. final pos = insertData.findMap(map1[i]);
  447. if (pos > -1) {
  448. maintainIndices.add(i); // Item exists in both lists
  449. insertData.removeAt(pos);
  450. } else {
  451. removeIndices.add(i); // Item does not exist in map2
  452. }
  453. }
  454. return (
  455. maintainIndices: maintainIndices,
  456. removeIndices: removeIndices,
  457. insertData: insertData
  458. );
  459. }
  460. List<List<T>> chunkList<T>(List<T> list, int chunkSize) {
  461. if (chunkSize <= 0) {
  462. throw ArgumentError('chunkSize must be greater than 0');
  463. }
  464. List<List<T>> chunks = [];
  465. for (var i = 0; i < list.length; i += chunkSize) {
  466. chunks.add(list.sublist(
  467. i, i + chunkSize > list.length ? list.length : i + chunkSize));
  468. }
  469. return chunks;
  470. }
  471. }
  472. extension IterableDiff<T> on Iterable<T> {
  473. (Iterable<T> add, Iterable<T> remove) diff(Iterable<T> listB) {
  474. // Convert listA to a list for easier indexing
  475. final listA = this;
  476. // Items to add are those in listB but not in listA
  477. final add = listB.where((item) => !listA.contains(item));
  478. // Items to remove are those in listA but not in listB
  479. final remove = listA.where((item) => !listB.contains(item));
  480. return (add, remove);
  481. }
  482. }
  483. extension CompareIterables<T> on Iterable<T> {
  484. /// Compares this iterable with another iterable and returns a map containing:
  485. /// - 'added': Items that are in the other iterable but not in this one.
  486. /// - 'removed': Items that are in this iterable but not in the other one.
  487. (Iterable<T> add, Iterable<T> remove) compareWith(Iterable<T> other) {
  488. final Set<T> thisSet = this.toSet();
  489. final Set<T> otherSet = other.toSet();
  490. final Set<T> added = otherSet.difference(thisSet);
  491. final Set<T> removed = thisSet.difference(otherSet);
  492. return (added, removed);
  493. }
  494. }
  495. extension FilterMapByKeys on Map {
  496. /// Returns a new map containing only the keys (and their associated values)
  497. /// that are present in the [keysToKeep] list.
  498. Map<K, V> filterKeys<K, V>(List<K> keysToKeep) {
  499. return Map<K, V>.fromEntries(
  500. entries
  501. .where((entry) => keysToKeep.contains(entry.key))
  502. .cast<MapEntry<K, V>>(),
  503. );
  504. }
  505. }
  506. extension RemoveNull<T> on Iterable<T?> {
  507. /// Returns a new iterable with all null values removed.
  508. Iterable<T> removeNull() {
  509. return where((element) => element != null).cast<T>();
  510. }
  511. }
  512. extension GroupBy<T> on Iterable<T> {
  513. Map<K, List> groupBy<K>(K Function(T) keyFunction,
  514. {Function(T)? dataFunction, bool Function(T)? keyIsNullFunction}) {
  515. final map = <K, List>{};
  516. for (final element in this) {
  517. final key = keyFunction(element);
  518. final keyIsNull =
  519. keyIsNullFunction == null ? false : keyIsNullFunction(element);
  520. if (keyIsNull || key == null) continue;
  521. if (dataFunction != null) {
  522. map.putIfAbsent(key, () => []).add(dataFunction(element));
  523. } else
  524. map.putIfAbsent(key, () => []).add(element);
  525. }
  526. return map;
  527. }
  528. }
  529. extension NullIfEmpty on String {
  530. String? get nullIfEmpty => isEmpty ? null : this;
  531. }
  532. extension RemoveQuotes on String {
  533. String get removeQuotes {
  534. if (isEmpty) return this; // Return if the string is empty
  535. // Remove the first and last characters if they are quotes
  536. String result = this;
  537. // Check if the first character is a quote
  538. bool startsWithQuote = result.startsWith('"') || result.startsWith("'");
  539. if (startsWithQuote) result = result.substring(1);
  540. // Check if the last character is a quote
  541. bool endsWithQuote = result.endsWith('"') || result.endsWith("'");
  542. if (endsWithQuote) result = result.substring(0, result.length - 1);
  543. return result;
  544. }
  545. }
  546. bool mapsAreEqual(Map<String, dynamic> map1, Map<String, dynamic> map2) {
  547. if (map1.length != map2.length) return false;
  548. for (var key in map1.keys) {
  549. if (map1[key] != map2[key]) return false;
  550. }
  551. return true;
  552. }
  553. extension ContainsMap on List<Map<String, dynamic>> {
  554. bool containsMap(Map<String, dynamic> map) {
  555. for (var item in this) {
  556. if (mapsAreEqual(item, map)) return true;
  557. }
  558. return false;
  559. }
  560. int findMap(Map<String, dynamic> map) {
  561. for (int i = 0; i < this.length; i++) {
  562. if (mapsAreEqual(this.elementAt(i), map)) return i;
  563. }
  564. return -1;
  565. }
  566. }