import 'dart:collection'; import 'dart:io'; import 'package:file_upload_processor/handlers/base_api.dart'; import 'package:intl/intl.dart'; import 'package:mime/mime.dart'; import 'package:shelf/shelf.dart' as shelf; import 'package:path/path.dart' as path; import 'package:http_parser/http_parser.dart'; import 'package:archive/archive.dart'; import 'package:supabase/supabase.dart'; class FileUploadApi extends BaseApi { FileUploadApi(shelf.Request request) : super(request); final uploadFolder = "./uploaded"; String get rawFolder => "$uploadFolder/raw"; String get dataFolder => "$uploadFolder/data"; String workingFolder = ""; String get zipFolder => "$rawFolder/$workingFolder"; String get extFolder => "$dataFolder/$workingFolder"; SupabaseClient getSupabaseClient(shelf.Request request) { final supabaseUrl = request.headers['supabase-url']; if (supabaseUrl == null) { throw Exception('Supabase URL not provided in headers'); } final authHeader = request.headers['authorization']; if (authHeader == null || !authHeader.startsWith('Bearer ')) { throw Exception('Invalid or missing Authorization Bearer token'); } final token = authHeader.substring(7); // Remove 'Bearer ' prefix return SupabaseClient( supabaseUrl, token, ); } String getTimestampedFilename(String originalFilename) { final timestamp = DateFormat('yyyyMMdd_HHmmss').format(DateTime.now()); return '${timestamp}_$originalFilename'; } Future uploadToSupabase( String filePath, String filename, SupabaseClient supabaseClient, {bool timestamped = false, required String bucket, bool upsert = true}) async { try { final file = File(filePath); final timestampedFilename = timestamped ? getTimestampedFilename(filename) : filename; await supabaseClient.storage.from(bucket).upload( timestampedFilename, file, fileOptions: FileOptions( cacheControl: '3600', upsert: upsert, ), ); print('File uploaded to Supabase: $timestampedFilename'); } catch (e) { print('Error uploading to Supabase: $e'); rethrow; } } Future initializeDirectories() async { final directories = [ Directory(rawFolder), Directory(dataFolder), Directory(zipFolder), Directory(extFolder), ]; for (var dir in directories) { if (!await dir.exists()) { await dir.create(recursive: true); } } } bool isZipFile(List bytes) { if (bytes.length < 4) return false; return bytes[0] == 0x50 && bytes[1] == 0x4B && bytes[2] == 0x03 && bytes[3] == 0x04; } Future> processZipFile(String filePath) async { List files = []; final bytes = await File(filePath).readAsBytes(); final archive = ZipDecoder().decodeBytes(bytes); for (final file in archive) { final filename = file.name; if (file.isFile) { final data = file.content as List; final outFile = File(path.join(extFolder, filename)); await outFile.parent.create(recursive: true); await outFile.writeAsBytes(data); files.add(path.join(extFolder, filename)); } } return files; } @override response() async { workingFolder = DateTime.now().millisecondsSinceEpoch.toString(); final supabaseClient = getSupabaseClient(request); await initializeDirectories(); final contentType = request.headers['content-type']; if (contentType == null || !contentType.toLowerCase().startsWith('multipart/form-data')) { return shelf.Response.badRequest( body: 'Content-Type must be multipart/form-data'); } try { final mediaType = MediaType.parse(contentType); final boundary = mediaType.parameters['boundary']; if (boundary == null) { return shelf.Response.badRequest(body: 'Boundary not found'); } final transformer = MimeMultipartTransformer(boundary); final bodyBytes = await request.read().expand((e) => e).toList(); final stream = Stream.fromIterable([bodyBytes]); final parts = await transformer.bind(stream).toList(); for (var part in parts) { final contentDisposition = part.headers['content-disposition']; if (contentDisposition == null) continue; final filenameMatch = RegExp(r'filename="([^"]*)"').firstMatch(contentDisposition); if (filenameMatch == null) continue; final filename = filenameMatch.group(1); if (filename == null) continue; final bytes = await part.fold>( [], (prev, element) => [...prev, ...element], ); final rawFilePath = path.join(zipFolder, filename); await File(rawFilePath).writeAsBytes(bytes); List files = []; if (isZipFile(bytes)) { files.addAll(await processZipFile(rawFilePath)); } else { final dataFilePath = path.join(extFolder, filename); await File(rawFilePath).copy(dataFilePath); files.add(dataFilePath); } bytes.clear(); await uploadToSupabase(rawFilePath, filename, supabaseClient, bucket: 'csvhich_archive', timestamped: true, upsert: false); for (var file in files) { final fileProcess = FileProcess(file, supabaseClient); await fileProcess.go(); } } return shelf.Response.ok('File processed and uploaded successfully'); } catch (e, stackTrace) { //print('Error: $e\n$stackTrace'); return shelf.Response.internalServerError( body: 'Error processing upload: $e'); } finally { supabaseClient.dispose(); await File(zipFolder).delete(recursive: true); await File(extFolder).delete(recursive: true); } } } class FileProcess { FileProcess(this.filepath, this.supabase); final String filepath; final SupabaseClient supabase; String get filename => filepath.replaceAll('\\', "/").split("/").last; final Map tables = { "secondprgtype.txt": "aclegs_csv", "ExportPGRGPNmois.txt": "pnlegs_csv", "exportPGRGPN.txt": "pnlegs_csv", "exportlicence.txt": "licences_csv", }; final Map> _headers = { "secondprgtype.txt": [ "leg_no", "fn_carrier", "fn_number", "fn_suffix", "day_of_origin", "ac_owner", "ac_subtype", "ac_version", "ac_registration", "dep_ap_actual", "dep_ap_sched", "dep_dt_est", "dep_sched_dt", "arr_ap_actual", "arr_ap_sched", "arr_dt_est", "arr_sched_dt", "slot_time_actual", "leg_type", "status", "employer_cockpit", "employer_cabin", "cycles", "delay_code_01", "delay_code_02", "delay_code_03", "delay_code_04", "delay_time_01", "delay_time_02", "delay_time_03", "delay_time_04", "subdelay_code_01", "subdelay_code_02", "subdelay_code_03", "subdelay_code_04", "pax_booked_c", "pax_booked_y", "pax_booked_trs_c", "pax_booked_trs_y", "pad_booked_c", "pad_booked_y", "offblock_dt_a", "airborne_dt_a", "landing_dt_a", "onblock_dt_a", "offblock_dt_f", "airborne_dt_f", "landing_dt_f", "onblock_dt_f", "offblock_dt_m", "airborne_dt_m", "landing_dt_m", "onblock_dt_m", "eet", ], "exportPGRGPN.txt": [ "date", "tlc", "actype", "al", "fnum", "ddep", "hdep", "ddes", "hdes", "dep", "des", "label", "type", ], "ExportPGRGPNmois.txt": [ "date", "tlc", "actype", "al", "fnum", "ddep", "hdep", "ddes", "hdes", "dep", "des", "label", "type", ], "exportlicence.txt": [ "tlc", "fname", "mname", "lname", "expire", "ac", "college", "base", ], }; final Map scopes = { "secondprgtype.txt": "day_of_origin", "exportPGRGPN.txt": "date", "ExportPGRGPNmois.txt": "date", "exportlicence.txt": "tlc", }; final Map> idToRemove = { "secondprgtype.txt": ["day_of_origin"], "exportPGRGPN.txt": ["date", "tlc"], "ExportPGRGPNmois.txt": ["date", "tlc"], "exportlicence.txt": ["tlc"], }; final Map> ids = { "secondprgtype.txt": { "table": "aclegs_log", "headers": [ "day_of_origin", "dep_sched_dt", "fn_carrier", "fn_number", "dep_ap_sched", "arr_ap_sched", // "dep_ap_actual", // "arr_ap_actual" ] }, "exportPGRGPN.txt": { "table": "pnlegs_log", "headers": ["tlc", "date", "dep", "des", "al", "fnum,", "label"] }, "ExportPGRGPNmois.txt": { "table": "pnlegs_log", "headers": ["tlc", "date", "dep", "des", "al", "fnum,", "label"] }, "exportlicence.txt": { "table": "qualifs_log", "headers": ["tlc", "college", "ac", "base"] }, }; Future>> parseCsv() async { final headers = _headers[filename] ?? []; if (headers.isEmpty) { throw Exception('No headers found for file: $filename'); } // Initialize an empty list to hold the parsed data List> data = []; // Read the CSV file final file = File(filepath); final lines = await file.readAsLines(); // Iterate over each line in the CSV file for (int i = 0; i < lines.length; i++) { // Split the line into individual values final values = lines[i].split(','); if (values.length != headers.length) { // print('Skipping line $i: Incorrect number of values: line: $i'); continue; } // Create a map for the current row Map row = {}; // Assign each value to the corresponding header for (int j = 0; j < headers.length; j++) { row[headers[j]] = values[j].removeQuotes.trim().nullIfEmpty; } // Add the row map to the data list data.add(row); } // Return the parsed data return data; } List get filesTomonitor => _headers.keys.toList(); Future go() async { if (!filesTomonitor.contains(filename)) return; final mapsToInsert = await parseCsv(); final scopeName = scopes[filename] ?? ""; final scopeInNew = mapsToInsert .fold({}, (t, e) => t..add(e[scopeName] ?? "")).toList(); List> oldIds = []; List> oldComparable = []; //load old data for (var e in splitList(scopeInNew, headerToNb(scopeInNew))) { final res = await supabase .from(tables[filename]!) .select() .inFilter(scopeName, e) .limit(100000); oldIds.addAll(res.map((e) => filterMapByKeys(e, ["id"])).toList()); oldComparable.addAll(res .map((e) => filterMapByKeys(e, _headers[filename] ?? [])) .toList()); } List indexToRemove = []; List indexToMaintain = []; final keys2check = idToRemove[filename] ?? []; for (int i = 0; i < oldComparable.length; i++) { final item = oldComparable[i]; final index = findIndex(mapsToInsert, item); if (index > -1) { indexToMaintain.add(i); mapsToInsert.removeAt(index); } else { // final mawjoudin = oldComparable.fold(>[], (t, e) { // return t..add(filterMapByKeys(e, keys2check)); // }); // final mawjood = mawjoudin // .firstWhere((e) => mapEquals(filterMapByKeys(item, keys2check), e), // orElse: () => {}) // .keys // .isNotEmpty; // if (mawjood) { // indexToRemove.add(i); // } else { // print("saved item: $item"); // } indexToRemove.add(i); } } // removing index to remove with id for (var e in splitList(indexToRemove.map((e) => oldIds[e]['id']).toList(), headerToNb(indexToRemove.map((e) => oldIds[e]['id']).toList()))) { await supabase .from(tables[filename]!) // Replace with your actual table name .delete() .inFilter('id', e); } // insering new data await supabase .from(tables[filename]!) // Replace with your actual table name .insert(mapsToInsert); print( " insert:${mapsToInsert.length} remove:${indexToRemove.length} maintain:${indexToMaintain.length}"); } bool mapEquals(Map map1, Map map2) { //if (map1.length != map2.length) return false; for (var key in map1.keys) { if (map1[key] != map2[key]) return false; } return true; } int findIndex(List list, dynamic element) { for (int i = 0; i < list.length; i++) { if (mapEquals(list[i], element)) { return i; } } return -1; // Return -1 if the element is not found } Map filterMapByKeys( Map originalMap, List keysToInclude) { // Create a new map to hold the filtered results Map filteredMap = {}; // Iterate through the list of keys to include for (String key in keysToInclude) { // Check if the key exists in the original map if (originalMap.containsKey(key)) { filteredMap[key] = originalMap[key]; // Add to the new map } } return filteredMap; } List> splitList(List originalList, int maxSize) { List> sublists = []; for (int i = 0; i < originalList.length; i += maxSize) { // Create a sublist for the current chunk List sublist = originalList.sublist( i, (i + maxSize > originalList.length) ? originalList.length : i + maxSize, ); sublists.add(sublist); } return sublists; } int headerToNb(List list) { //header max of 16k final maxheader = 4 * 1024 * 8; if (list.isEmpty) { return list.length; } final length1 = (list.toString().length / list.length).ceil() * 8; final lengthurl = 200 * 8; final res = ((maxheader - lengthurl) / length1).floor(); //print("header2nb: $res"); return res; } } extension NullIfEmpty on String { String? get nullIfEmpty => isEmpty ? null : this; } extension RemoveQuotes on String { String get removeQuotes { if (isEmpty) return this; // Return if the string is empty // Check if the first character is a quote bool startsWithQuote = startsWith('"') || startsWith("'"); // Check if the last character is a quote bool endsWithQuote = endsWith('"') || endsWith("'"); // Remove the first and last characters if they are quotes String result = this; if (startsWithQuote) result = result.substring(1); if (endsWithQuote) result = result.substring(0, result.length - 1); return result; } }